Skip to content

Commit

Permalink
Inject use messages into ongoing conversations
Browse files Browse the repository at this point in the history
  • Loading branch information
slimslenderslacks committed Oct 20, 2024
1 parent abc2b6e commit a5cc926
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 54 deletions.
18 changes: 6 additions & 12 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,24 @@ RUN --mount=type=cache,target=/nix,from=nixos/nix:2.21.1,source=/nix \
--extra-trusted-substituters "https://cache.iog.io" \
--extra-trusted-public-keys "hydra.iohk.io:f/Ea+s+dFdN+3Y/G+FDgSq+a5NEWhJGzdjvKNGv0/EQ=" \
--show-trace \
--log-format raw \
--log-format bar-with-logs \
build . --out-link /tmp/output/result
cp -R $(nix-store -qR /tmp/output/result) /tmp/nix-store-closure
EOF

FROM scratch

# my convention is that images have only two top-level folders
# /nix/store has all of the software
# /app/result has symbolic links for any entrypoints needed
WORKDIR /app

COPY --from=builder /tmp/nix-store-closure /nix/store
COPY --from=builder /tmp/output/ /app/

COPY ./extractors/registry.edn ./extractors/registry.edn
COPY ./functions/registry.edn ./functions/registry.edn
COPY prompts/docker docker
COPY prompts/lazy_docker lazy_docker

# curl needs the /tmp directory to already exist
# programs like curl needs the /tmp directory to already exist
# what is the right way to manage things like this?
COPY <<EOF /tmp/.blank
empty
EOF

COPY <<EOF /root/.blank
empty
EOF

ENTRYPOINT ["/app/result/bin/entrypoint"]
3 changes: 2 additions & 1 deletion deps.edn
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{:paths ["src" "dev"]
:deps {markdown-clj/markdown-clj {:mvn/version "1.12.1"}
:deps {org.clojure/clojure {:mvn/version "1.11.4"}
markdown-clj/markdown-clj {:mvn/version "1.12.1"}
pogonos/pogonos {:mvn/version "0.2.1"}
dev.weavejester/medley {:mvn/version "1.8.0"}
io.replikativ/hasch {:mvn/version "0.3.94"}
Expand Down
35 changes: 19 additions & 16 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -36,29 +36,17 @@
in
{
packages = rec {

# build uber jar
clj = pkgs.clj-nix.mkCljBin {
name = "agent-graph";
projectSrc = ./.;
main-ns = "docker.main";
buildCommand = "clj -T:build uber";
jdkRunner = pkgs.jdk17_headless;
};
deps-cache = pkgs.clj-nix.mk-deps-cache {
lockfile = ./deps-lock.json;
};
graal = pkgs.clj-nix.mkGraalBin {
# lazy lookup of a derivation that will exist
cljDrv = self.packages."${system}".clj;
graalvmXmx = "-J-Xmx8g";
graalvm = pkgs.graalvm-ce;
extraNativeImageBuildArgs = [
"--native-image-info"
"--initialize-at-build-time"
"--enable-http"
"--enable-https"
];
};

# create a minimal java runtime for this uberjar
custom-jdk = pkgs.clj-nix.customJdk {
cljDrv = clj;
jdkBase = pkgs.jdk17_headless;
Expand All @@ -67,15 +55,30 @@
extraJdkModules = ["java.security.jgss" "java.security.sasl" "jdk.crypto.ec"];
};

# create some resources that will need to be copied into the final image
registries = pkgs.stdenv.mkDerivation {
name = "registries";
src = ./.;
installPhase = ''
mkdir -p $out/extractors
mkdir -p $out/functions
cp ./extractors/registry.edn $out/extractors
cp ./functions/registry.edn $out/functions
'';
};

# our application makes calls to the curl binary
# therefore, wrap the custom-jdk in a script with curl in the PATH
entrypoint = pkgs.writeShellScriptBin "entrypoint" ''
export PATH=${pkgs.lib.makeBinPath [pkgs.curl]}
export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt
${custom-jdk}/bin/agent-graph "$@"
'';

# the final entrypoint
default = pkgs.buildEnv {
name = "agent-graph-env";
paths = [ entrypoint ];
paths = [ entrypoint registries ];
};
};

Expand Down
2 changes: 2 additions & 0 deletions graphs/prompts/journals/2024_09_03.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
-v /run/host-services/backend.sock:/host-services/docker-desktop-backend.sock \
-e "DOCKER_DESKTOP_SOCKET_PATH=/host-services/docker-desktop-backend.sock" \
--mount type=volume,source=docker-prompts,target=/prompts \
-e "OPENAI_API_KEY_LOCATION=/root" \
--mount type=bind,source=$HOME/.openai-api-key,target=/root/.openai-api-key \
vonwig/prompts:latest \
run \
Expand All @@ -29,6 +30,7 @@
-v /run/host-services/backend.sock:/host-services/docker-desktop-backend.sock \
-e "DOCKER_DESKTOP_SOCKET_PATH=/host-services/docker-desktop-backend.sock" \
--mount type=volume,source=docker-prompts,target=/prompts \
-e "OPENAI_API_KEY_LOCATION=/root" \
--mount type=bind,source=$HOME/.openai-api-key,target=/root/.openai-api-key \
--mount type=bind,source=$PROMPTS_DIR,target=/app/workdir \
--workdir /app/workdir \
Expand Down
43 changes: 22 additions & 21 deletions src/graph.clj
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
; be merged into the conversation state
; =====================================================

(defn start
(defn start
"create starting messages, metadata, and functions to bootstrap the thread"
[{:keys [prompts] :as opts} _]
(let [c (async/promise-chan)]
Expand All @@ -70,20 +70,20 @@
(async/put! c {:messages [] :done "error"})))
c))

(defn end
(defn end
"merge the :done signal"
[state]
(let [c (async/promise-chan)]
;; this is a normal ending and we try to add a :done key to the state for this
(async/put! c (assoc state :done (:finish-reason state)))
c))

(defn completion
(defn completion
"get the next llm completion"
[state]
(run-llm (:messages state) (:metadata state) (:functions state) (:opts state)))

(defn tool
(defn tool
"make docker container tool calls"
[state]
(let [calls (-> (:messages state) last :tool_calls)]
Expand All @@ -102,7 +102,7 @@

; tool_calls are maps with an id and a function with arguments an name
; look up the full tool definition using the name
(defn sub-graph
(defn sub-graph
"answer a tool call by processing a sub-graph"
[state]
(async/go
Expand All @@ -126,7 +126,7 @@
; edge functions takes state and returns next node
; =====================================================

(defn tool-or-end
(defn tool-or-end
"after a completion, check whether you need to make a tool call"
[state]
(let [finish-reason (-> state :finish-reason)]
Expand All @@ -148,7 +148,7 @@
(defn add-conditional-edges [graph s1 f & [m]]
(assoc-in graph [:edges s1] ((or m identity) f)))

(defn state-reducer
(defn state-reducer
"reduce the state with the change from running a node"
[state change]
(-> state
Expand All @@ -157,20 +157,21 @@

(defn stream
"start streaming a conversation"
[graph]
(async/go-loop
[state {}
node "start"]
(jsonrpc/notify :message {:debug (format "\n-> entering %s\n\n" node)})
;; TODO handling bad graphs with missing nodes
(let [enter-node (get-in graph [:nodes node])
new-state (state-reducer state (async/<! (enter-node state)))]
(if (= "end" node)
new-state
;; TODO check for :done keys and possibly bail
;; transition to the next state
;; TODO handling missing edges
(recur new-state ((get-in graph [:edges node]) new-state))))))
([graph] (stream graph {}))
([graph m]
(async/go-loop
[state m
node "start"]
(jsonrpc/notify :message {:debug (format "\n-> entering %s\n\n" node)})
;; TODO handling bad graphs with missing nodes
(let [enter-node (get-in graph [:nodes node])
new-state (state-reducer state (async/<! (enter-node state)))]
(if (= "end" node)
new-state
;; TODO check for :done keys and possibly bail
;; transition to the next state
;; TODO handling missing edges
(recur new-state ((get-in graph [:edges node]) new-state)))))))

; ============================================================
; this is the graph we tend to use in our experiments thus far
Expand Down
8 changes: 7 additions & 1 deletion src/jsonrpc.clj
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
:else (recur (parse-header line headers))))))
messages))

(defn ^:private write-message [^OutputStream output msg]
(defn write-message [^OutputStream output msg]
(let [content (json/generate-string msg)
content-bytes (.getBytes content "utf-8")]
(locking write-lock
Expand All @@ -106,6 +106,12 @@
:method method
:params params})

(defn request [method params get-id]
{:jsonrpc "2.0"
:method method
:id (get-id)
:params params})

;; message({:debug ""}) - debug messages are often serialized edn but still meant to be streamed
;; message({:content ""}) - meant to be streamed
;; prompts({:messages [{:role "", :content ""}]})
Expand Down
2 changes: 1 addition & 1 deletion src/prompts.clj
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@
[])))

(def hub-images
#{"curl" "qrencode" "toilet" "figlet" "gh" "typos" "fzf" "jq" "fmpeg" "pylint" "imagemagick"})
#{"curl" "qrencode" "toilet" "figlet" "gh" "typos" "fzf" "jq" "fmpeg" "pylint" "imagemagick" "graphviz"})

(defn collect-functions
"get either :functions or :tools collection
Expand Down
4 changes: 2 additions & 2 deletions src/registry.clj
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
(set! *warn-on-reflection* true)

(defn- functions-dir []
(dir/get-dir "./functions" "/app/functions"))
(dir/get-dir "./functions" "/app/result/functions" "/app/functions"))

(defn- extractors-dir []
(dir/get-dir "./extractors" "/app/extractors"))
(dir/get-dir "./extractors" "/app/result/extractors" "/app/extractors"))

(defn- get-registry [f]
(when-let [d (f)]
Expand Down
59 changes: 59 additions & 0 deletions src/user_loop.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
(ns user-loop
(:require
[clojure.core.async :as async]
graph
jsonrpc)
(:import
[java.io
BufferedOutputStream
OutputStream
PipedInputStream
PipedOutputStream]))

(declare graph)

(def do-stream (partial graph/stream graph))

(defn start-jsonrpc-loop [f in m]
(let [c (jsonrpc/input-stream->input-chan in {})]
(async/go-loop
[state m]
(let [message (async/<! c)
s (-> message :params :content)]
(println "message content: " s)
(if (some (partial = s) ["exit" "quit" "q"])
state
(recur (async/<! (f state s))))))))

(def counter (atom 0))
(defn get-id [] (swap! counter inc))

(def ^{:private true} start-test-loop
(partial start-jsonrpc-loop (fn [state s]
(async/go
(update state :messages (fnil conj []) s)))))

(defn -create-pipe []
;; Create a PipedInputStream and PipedOutputStream
(let [piped-out (PipedOutputStream.)
piped-in (PipedInputStream. piped-out)
buffered-out (BufferedOutputStream. piped-out)]
[[(fn [s] (jsonrpc/write-message buffered-out s))
(fn [] (.close ^OutputStream buffered-out))]
piped-in]))

(comment
(let [[[w c] in] (-create-pipe)]
(async/go (println "ending: " (async/<! (start-test-loop in {}))))
(w (jsonrpc/request "prompt" {:content "hello"} get-id))
(w (jsonrpc/request "prompt" {:content "hello1"} get-id))
(w (jsonrpc/request "prompt" {:content "exit"} get-id))
(c)))

(comment
;; an input stream is something from which we can read bytes
;; in a jvm, we can create Strings from bytes
;; a byte is 8 bits in java and big-endian by default
;; 8 bits can be stored using two hex digits (0-9, a-f) 00-ff (0-255) 1111 8+4+2+1=15
)

0 comments on commit a5cc926

Please sign in to comment.