Skip to content

Commit

Permalink
modulerize chloe_distributed
Browse files Browse the repository at this point in the history
  • Loading branch information
arabidopsis committed Mar 9, 2024
1 parent 2034e0d commit 2a0fc31
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 55 deletions.
18 changes: 9 additions & 9 deletions DEV.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ r = fetch(@spawnat :any annotate(db, "testfa/NC_020019.1.fa"))
Running the chloe server. In a terminal type:

```bash
julia -t 8 --project=. distributed.jl --level=info --workers=4 --broker=default
julia -t8 --project=. distributed.jl --level=info --workers=4 --broker=default --reference=/path/to/chloe_references
# *OR*
julia -t 8 --project=. -e 'using Chloe; distributed_main()' -- --level=info --workers=4 --broker=default
julia -t8 --project=. -e 'using Chloe; distributed_main()' -- --level=info --workers=4 --broker=default --reference=/path/to/chloe_references
```

(Julia as of 1.4 refuses to use more threads that the number of CPUs on your machine:
Expand All @@ -129,16 +129,16 @@ In another terminal start julia:
using JuliaWebAPI
import Chloe

i = APIInvoker(Chloe.ZMQ_CLIENT);
i = APIInvoker(Chloe.ZMQ_ENDPOINT);
apicall(i, "ping") # ping the server to see if is listening.

# fasta and output should be relative to the server'
# working directory, or specify absolute path names! yes "chloe"
# should be "annotate" but...
fastafile = "testfa/NC_020019.1.fa"
ret = apicall(i, "chloe", fastafile, outputfile)
ret = apicall(i, "chloe", fastafile)
code, data = ret["code"], ret["data"]
@assert code === 200
@assert code == 200
# actual filename written and total elapsed
# time in ms to annotate
sff_fname, elapsed_ms = data["filename"], data["elapsed"]
Expand All @@ -151,16 +151,16 @@ The *actual* production configuration uses `distributed.jl`
the server as a client of a DEALER/ROUTER server
(see `bin/broker.py` or `src/dist/broker.jl` and the `Makefile`). It *connects* to the
DEALER end on `tcp://127.0.0.1:9467`. The
[chloe website](https://chloe.plantenergy.edu.au)
connects to `ipc:///tmp/chloe4-client` which
[chloe website](https://chloe.plastid.org)
connects to `ipc:///tmp/chloe5-client` which
is the ROUTER end of broker. In this setup
you can run multiple chloe servers connecting
to the same DEALER.

**Update**: you can now run a broker with julia as `julia --project=. src/dist/broker.jl`
*or* specify `--broker=URL` to `distrbuted.jl`. No
python required. (best to use `--broker=default` to select
this projects default endpoint (`Chloe.ZMQ_CLIENT`))
this projects default endpoint (`Chloe.ZMQ_ENDPOINT`))


## Running Remotely
Expand Down Expand Up @@ -196,7 +196,7 @@ i = APIInvoker("tcp://127.0.0.1:9467")
fasta = read("testfa/NC_020019.1.fa", String)
ret = apicall(i, "annotate", fasta)
code, data = ret["code"], ret["data"]
@assert code === 200
@assert code == 200
sff = data["sff"] # sff file as a string
# terminate the server
apicall(i, "exit")
Expand Down
35 changes: 23 additions & 12 deletions RECIPES.md
Original file line number Diff line number Diff line change
@@ -1,27 +1,38 @@
# Recipies

## Installing Julia

Follow the directions at the link to install [`juliaup`](https://julialang.org/downloads/).

## Creating a Julia Project

```bash
```sh
# create a Julia project in directory myproject
julia -e 'using Pkg; Pkg.generate("myproject")'
cd myproject
# add Chloe to the project
julia --project=. -e 'using Pkg; Pkg.add("https://github.com/ian-small/chloe.git")'
# get Chloe database (This can be placed anywhere you want really)
```
Get the Chloe database (This can be placed anywhere you want really)

```sh
git clone https://github.com/ian-small/chloe_references
# *OR* use julia
julia -e 'import Pkg; Pkg.GitTools.clone(stdout, "https://github.com/ian-small/chloe_references", "chloe_references")'
```

## CommandLine

Annotate a fasta file from the command line.
Annotate fasta files from the command line.

```bash
```sh
# note the '--'
julia --project=. -e 'using Chloe; chloe_main()' -- annotate NC_011032.1.fa
julia --project=. -e 'using Chloe; chloe_main()' -- \
annotate --reference=/path/to/chloe_references NC_011032.1.fa NC_011713.2.fa
```
This will annotate files one-by-one.

## Simple
## Using the Julia REPL

Annotate a single FASTA file.

Expand All @@ -31,7 +42,7 @@ import Chloe
# import Logging
# Logging.disable_logging(Logging.Info) # Disable debug and info

references = Chloe.ReferenceDbFromDir("chloe_references")
references = Chloe.ReferenceDbFromDir("/path/to/chloe_references")

outfile, uid = Chloe.annotate(references, "NC_011032.1.fa")

Expand All @@ -42,7 +53,7 @@ Write to buffer instead of to a file.

```julia
import Chloe
references = Chloe.ReferenceDbFromDir("chloe_references")
references = Chloe.ReferenceDbFromDir("/path/to/chloe_references")
io, uid = Chloe.annotate(references, "NC_011032.1.fa", nothing, IOBuffer())
# show .sff content
println(String(take!(io)))
Expand All @@ -53,12 +64,12 @@ Read from an already open fasta file.

```julia
import Chloe
references = Chloe.ReferenceDbFromDir("chloe_references")
references = Chloe.ReferenceDbFromDir("/path/to/chloe_references")
outfile, uid = open("NC_011032.1.fa", "r") do io
Chloe.annotate(references, io)
end
```
## Distributed
## [Distributed](https://docs.julialang.org/en/v1/stdlib/Distributed/index.html)

It's easy to annotate multiple fasta files in parallel

Expand All @@ -72,7 +83,7 @@ addprocs(2)
# to quieten Chloe set the logging level:
# import Logging
# Logging.disable_logging(Logging.Info) # Disable debug and info
references = Chloe.ReferenceDbFromDir("chloe_references")
references = Chloe.ReferenceDbFromDir("/path/to/chloe_references")
end

fasta_directory = "fastas"
Expand All @@ -94,7 +105,7 @@ using Distributed
addprocs(4)
@everywhere begin
using Chloe
references = ReferenceDbFromDir("~/chloe_references")
references = ReferenceDbFromDir("/path/to/chloe_references")
end
r = fetch(@spawnat :any annotate(references, "NC_011032.1.fa"))
println(r)
Expand Down
2 changes: 1 addition & 1 deletion chloe.jl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
if abspath(PROGRAM_FILE) == @__FILE__
import Chloe
Chloe.cmd_main()
Chloe.chloe_main(ARGS)
end
10 changes: 4 additions & 6 deletions src/Chloe.jl
Original file line number Diff line number Diff line change
@@ -1,25 +1,23 @@
module Chloe

export annotate, annotate_batch, ReferenceDbFromDir, ReferenceDb, ChloeConfig, AbstractReferenceDb
# export MayBeIO, MayBeString
export annotate, annotate_batch, ReferenceDbFromDir, ReferenceDb, ChloeConfig, AbstractReferenceDb, annotate_one_task
export chloe_main
export distributed_main, chloe_distributed, run_broker, broker_main, get_distributed_args, maybe_launch_broker
export distributed_main, broker_main
export set_global_logger
export annotate_one_task
export ZMQ_CLIENT
export ZMQ_ENDPOINT

include("annotate_genomes.jl")

include("dist/ZMQLogger.jl")
include("dist/broker.jl")
include("dist/WebAPI.jl")
include("dist/chloe_cmd.jl")
include("dist/tasks.jl")
include("dist/chloe_distributed.jl")

# import .ChloeDistributed: distributed_main, chloe_distributed, run_broker, get_distributed_args, maybe_launch_broker
import .Annotator: annotate_batch, annotate, ReferenceDb, ReferenceDbFromDir, AbstractReferenceDb, ChloeConfig
import .CmdLine: chloe_main
import .Broker: broker_main
import .ZMQLogging: set_global_logger
import .ChloeDistributed: distributed_main, ZMQ_ENDPOINT, annotate_one_task
end
2 changes: 1 addition & 1 deletion src/dist/broker.jl
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ function broker_args(args::Vector{String}=ARGS)
"--client"
arg_type = String
metavar = "URL"
default = ZMQ_CLIENT
default = ZMQ_ENDPOINT
help = "ZMQ ROUTER address to connect to"
end

Expand Down
39 changes: 22 additions & 17 deletions src/dist/chloe_distributed.jl
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
module ChloeDistributed
export distributed_main, annotate_one_task

import Distributed
import Distributed: addprocs, rmprocs, @spawnat, @everywhere, nworkers

Expand All @@ -10,19 +13,20 @@ import Crayons: @crayon_str
import StringEncodings: encode
import ZMQ

import .WebAPI: TerminatingJSONMsgFormat
import .Annotator: MayBeString, verify_refs, ChloeConfig, default_gsrefsdir
import .ZMQLogging: set_global_logger
import .Broker: check_endpoints, remove_endpoints
import ..WebAPI: TerminatingJSONMsgFormat
import ..Annotator: MayBeString, verify_refs, ChloeConfig, default_gsrefsdir
import ..ZMQLogging: set_global_logger
import ..Broker: check_endpoints, remove_endpoints

include("../globals.jl")
include("dist_globals.jl")

include("tasks.jl")
function git_version()
repo_dir = dirname(@__FILE__)
try
# older version of git don't have -C
# strip(read(pipeline(`git -C "$REPO_DIR" rev-parse HEAD`, stderr=devnull), String))
strip(read(pipeline(`sh -c 'cd "$REPO_DIR" && git rev-parse HEAD'`, stderr=devnull), String))
# strip(read(pipeline(`git -C "$repo_dir" rev-parse HEAD`, stderr=devnull), String))
strip(read(pipeline(`sh -c 'cd "$repo_dir" && git rev-parse HEAD'`, stderr=devnull), String))
catch e
"unknown"
end
Expand Down Expand Up @@ -113,7 +117,7 @@ function chloe_distributed(; gsrefsdir="default", address=ZMQ_WORKER,
procs = filter(w -> w != 1, Distributed.workers())
toadd = workers - length(procs)
if toadd > 0
addprocs(toadd; topology=:master_worker, exeflags="--project=$(pwd())")
addprocs(toadd; topology=:master_worker, exeflags="--project=$(Base.active_project())")
end

procs = filter(w -> w != 1, Distributed.workers())
Expand Down Expand Up @@ -148,21 +152,21 @@ function chloe_listen(address::String, broker::MayBeString, arm_procs::Function)
# GC.gc()
nannotations = 0

function chloe(fasta::String, outputsff::MayBeString, task_id::MayBeString=nothing, config::Union{Nothing,Dict{String,V} where V<:Any}=nothing)
function chloe(fasta::String, outputsff::MayBeString=nothing, config::Union{Nothing,Dict{String,V} where V<:Any}=nothing, task_id::MayBeString=nothing)
start = now()
cfg = if isnothing(config)
ChloeConfig()
else
ChloeConfig(config)
end
filename, target_id = fetch(@spawnat :any Chloe.annotate_one_task(fasta, outputsff, task_id, cfg))
filename, target_id = fetch(@spawnat :any Main.Chloe.annotate_one_task(fasta, outputsff, task_id, cfg))
elapsed = now() - start
@info success("finished $target_id after $elapsed")
nannotations += 1
return Dict("elapsed" => toms(elapsed), "filename" => filename, "ncid" => string(target_id), "config" => cfg)
end

function batch_annotate(directory::String, task_id::MayBeString=nothing, config::Union{Nothing,Dict{String,V} where V<:Any}=nothing)
function batch_annotate(directory::String, config::Union{Nothing,Dict{String,V} where V<:Any}=nothing, task_id::MayBeString=nothing)
start = now()
cfg = if isnothing(config)
ChloeConfig()
Expand All @@ -181,7 +185,7 @@ function chloe_listen(address::String, broker::MayBeString, arm_procs::Function)
read(encode(fasta, "latin1") |> IOBuffer |> GzipDecompressorStream, String)
end

function annotate(fasta::String, task_id::MayBeString=nothing, config::Union{Nothing,Dict{String,V} where V<:Any}=nothing)
function annotate(fasta::String, config::Union{Nothing,Dict{String,V} where V<:Any}=nothing, task_id::MayBeString=nothing)
start = now()
if startswith(fasta, "\u1f\u8b")
# assume latin1 encoded binary gzip file
Expand All @@ -195,7 +199,7 @@ function chloe_listen(address::String, broker::MayBeString, arm_procs::Function)
ChloeConfig(config)
end
input = IOBuffer(fasta)
io, target_id = fetch(@spawnat :any Chloe.annotate_one_task(input, task_id, cfg))
io, target_id = fetch(@spawnat :any Main.Chloe.annotate_one_task(input, task_id, cfg))
sff = String(take!(io))
elapsed = now() - start
@info success("finished $target_id after $elapsed")
Expand Down Expand Up @@ -282,7 +286,7 @@ function chloe_listen(address::String, broker::MayBeString, arm_procs::Function)
# add workers
@async begin
# ensure topology is the same
added = addprocs(n, topology=:master_worker, exeflags="--project=$(pwd())")
added = addprocs(n, topology=:master_worker, exeflags="--project=$(Base.active_project())")
arm_procs(added)
@info "added $(added) processes"
# update globals
Expand Down Expand Up @@ -411,7 +415,7 @@ function get_distributed_args(args::Vector{String}=ARGS)

end

function run_broker(worker::String=ZMQ_WORKER, client::String=ZMQ_CLIENT)
function run_broker(worker::String=ZMQ_WORKER, client::String=ZMQ_ENDPOINT)
# see https://discourse.julialang.org/t/how-to-run-a-process-in-background-but-still-close-it-on-exit/27231
# src = dirname(@__FILE__)
julia = joinpath(Sys.BINDIR, "julia")
Expand All @@ -424,7 +428,7 @@ function run_broker(worker::String=ZMQ_WORKER, client::String=ZMQ_CLIENT)
Base.exit(1) # can we throw....
end
# cmd = `$julia --project=$(pwd()) -q --startup-file=no "$src/broker.jl" --worker=$worker --client=$client`
cmd = `$julia --project=$(Base.active_project()) -q --startup-file=no -e "import Chloe; Chloe.broker_main()" -- --worker=$worker --client=$client`
cmd = `$julia --project="$(Base.active_project())" -q --startup-file=no -e "import Chloe; Chloe.broker_main()" -- --worker=$worker --client=$client`
# @info "running broker as: $cmd"
# wait = false means stdout,stderr are connected to /dev/null
task = run(cmd; wait=false)
Expand All @@ -449,7 +453,7 @@ function maybe_launch_broker(distributed_args)

if client_url !== nothing
if client_url == "default"
client_url = ZMQ_CLIENT
client_url = ZMQ_ENDPOINT
distributed_args[:broker] = client_url
end
if startswith(client_url, "@")
Expand Down Expand Up @@ -477,3 +481,4 @@ function distributed_main(args::Vector{String}=ARGS)
chloe_distributed(; distributed_args...)
end

end # module
6 changes: 3 additions & 3 deletions src/dist/dist_globals.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ const LOGLEVELS = Dict("info" => Logging.Info, "debug" => Logging.Debug, "warn"
"error" => Logging.Error)

const ZMQ_WORKER = "tcp://127.0.0.1:9458"
const ZMQ_CLIENT = "ipc:///tmp/chloe4-client"
const ZMQ_BACKEND = "ipc:///tmp/chloe4-backend"
const ZMQ_ENDPOINT = "ipc:///tmp/chloe5-client"
const ZMQ_BACKEND = "ipc:///tmp/chloe5-backend"
# change this if you change the API!
const VERSION = "4.0"
const VERSION = "5.0"
4 changes: 2 additions & 2 deletions src/dist/tasks.jl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

# put these in the global namespace
import .ZMQLogging: annotation_local_storage, set_global_logger, TASK_KEY
import .Annotator: annotate, MayBeIO, MayBeString, ChloeConfig
import ..ZMQLogging: annotation_local_storage, set_global_logger, TASK_KEY
import ..Annotator: annotate, MayBeIO, MayBeString, ChloeConfig


#### these are only used by chloe_distributed ####
Expand Down
8 changes: 4 additions & 4 deletions src/globals.jl
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
const REPO_DIR = dirname(@__FILE__)
const CHLOE_REFERENCES = "chloe_references"
const DEFAULT_GSREFS = joinpath(CHLOE_REFERENCES, "gsrefs")
const DEFAULT_NUMGSREFS = 16


const DEFAULT_GSREFS = joinpath("chloe_references", "gsrefs")
const DEFAULT_TEMPLATE = "templates.tsv"
const DEFAULT_NUMGSREFS = 16
const DEFAULT_SENSITIVITY = 0.5


0 comments on commit 2a0fc31

Please sign in to comment.