Skip to content

Commit

Permalink
context separation
Browse files Browse the repository at this point in the history
contexts
async
multiple chats
  • Loading branch information
frdel committed Sep 3, 2024
1 parent 040a665 commit 68ba6e9
Show file tree
Hide file tree
Showing 28 changed files with 689 additions and 478 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
**/__pycache__/



# Ignore all contents of the virtual environment directory
.venv/*

Expand Down
241 changes: 147 additions & 94 deletions agent.py

Large diffs are not rendered by default.

9 changes: 3 additions & 6 deletions initialize.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import models
from agent import Agent, AgentConfig
from agent import AgentConfig

def initialize():

Expand Down Expand Up @@ -52,9 +52,6 @@ def initialize():
# code_exec_ssh_pass = "toor",
# additional = {},
)

# create the first agent
agent0 = Agent( number = 0, config = config )

# return initialized agent
return agent0
# return config object
return config
61 changes: 61 additions & 0 deletions python/helpers/defer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import asyncio
import threading
from concurrent.futures import Future

class DeferredTask:
def __init__(self, func, *args, **kwargs):
self._loop = asyncio.new_event_loop()
# self._thread = None
self._task = None
self._future = Future()
self._start_task(func, *args, **kwargs)

def _start_task(self, func, *args, **kwargs):
def run_in_thread(loop, func, args, kwargs):
asyncio.set_event_loop(loop)
self._task = loop.create_task(self._run(func, *args, **kwargs))
loop.run_forever()

self._thread = threading.Thread(target=run_in_thread, args=(self._loop, func, args, kwargs))
self._thread.start()

async def _run(self, func, *args, **kwargs):
try:
result = await func(*args, **kwargs)
self._future.set_result(result)
except Exception as e:
self._future.set_exception(e)
finally:
self._loop.call_soon_threadsafe(self._loop.stop)

def is_ready(self):
return self._future.done()

async def result(self, timeout=None):
if self._task is None:
raise RuntimeError("Task was not initialized properly.")

try:
return await asyncio.wait_for(asyncio.wrap_future(self._future), timeout)
except asyncio.TimeoutError:
raise TimeoutError("The task did not complete within the specified timeout.")

def result_sync(self, timeout=None):
try:
return self._future.result(timeout)
except TimeoutError:
raise TimeoutError("The task did not complete within the specified timeout.")

def kill(self):
if self._task and not self._task.done():
self._loop.call_soon_threadsafe(self._task.cancel)

def is_alive(self):
return self._thread.is_alive() and not self._future.done()

def __del__(self):
if self._loop.is_running():
self._loop.call_soon_threadsafe(self._loop.stop)
if self._thread.is_alive():
self._thread.join()
self._loop.close()
17 changes: 9 additions & 8 deletions python/helpers/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
from python.helpers.log import Log

class DockerContainerManager:
def __init__(self, image: str, name: str, ports: Optional[dict[str, int]] = None, volumes: Optional[dict[str, dict[str, str]]] = None):
def __init__(self, logger: Log, image: str, name: str, ports: Optional[dict[str, int]] = None, volumes: Optional[dict[str, dict[str, str]]] = None):
self.logger = logger
self.image = image
self.name = name
self.ports = ports
Expand All @@ -25,9 +26,9 @@ def init_docker(self):
err = format_error(e)
if ("ConnectionRefusedError(61," in err or "Error while fetching server API version" in err):
PrintStyle.hint("Connection to Docker failed. Is docker or Docker Desktop running?") # hint for user
Log.log(type="hint", content="Connection to Docker failed. Is docker or Docker Desktop running?")
self.logger.log(type="hint", content="Connection to Docker failed. Is docker or Docker Desktop running?")
PrintStyle.error(err)
Log.log(type="error", content=err)
self.logger.log(type="error", content=err)
time.sleep(5) # try again in 5 seconds
else: raise
return self.client
Expand All @@ -38,10 +39,10 @@ def cleanup_container(self) -> None:
self.container.stop()
self.container.remove()
print(f"Stopped and removed the container: {self.container.id}")
Log.log(type="info", content=f"Stopped and removed the container: {self.container.id}")
self.logger.log(type="info", content=f"Stopped and removed the container: {self.container.id}")
except Exception as e:
print(f"Failed to stop and remove the container: {e}")
Log.log(type="error", content=f"Failed to stop and remove the container: {e}")
self.logger.log(type="error", content=f"Failed to stop and remove the container: {e}")


def start_container(self) -> None:
Expand All @@ -55,7 +56,7 @@ def start_container(self) -> None:
if existing_container:
if existing_container.status != 'running':
print(f"Starting existing container: {self.name} for safe code execution...")
Log.log(type="info", content=f"Starting existing container: {self.name} for safe code execution...")
self.logger.log(type="info", content=f"Starting existing container: {self.name} for safe code execution...")

existing_container.start()
self.container = existing_container
Expand All @@ -66,7 +67,7 @@ def start_container(self) -> None:
# print(f"Container with name '{self.name}' is already running with ID: {existing_container.id}")
else:
print(f"Initializing docker container {self.name} for safe code execution...")
Log.log(type="info", content=f"Initializing docker container {self.name} for safe code execution...")
self.logger.log(type="info", content=f"Initializing docker container {self.name} for safe code execution...")

self.container = self.client.containers.run(
self.image,
Expand All @@ -77,5 +78,5 @@ def start_container(self) -> None:
)
atexit.register(self.cleanup_container)
print(f"Started container with ID: {self.container.id}")
Log.log(type="info", content=f"Started container with ID: {self.container.id}")
self.logger.log(type="info", content=f"Started container with ID: {self.container.id}")
time.sleep(5) # this helps to get SSH ready
6 changes: 6 additions & 0 deletions python/helpers/errors.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import re
import traceback
import asyncio

def handle_error(e: Exception):
# if asyncio.CancelledError, re-raise
if isinstance(e, asyncio.CancelledError):
raise e

def format_error(e: Exception, max_entries=2):
traceback_text = traceback.format_exc()
# Split the traceback into lines
Expand Down
6 changes: 3 additions & 3 deletions python/helpers/knowledge_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def calculate_checksum(file_path: str) -> str:
hasher.update(buf)
return hasher.hexdigest()

def load_knowledge(knowledge_dir: str, index: Dict[str, KnowledgeImport]) -> Dict[str, KnowledgeImport]:
def load_knowledge(logger: Log, knowledge_dir: str, index: Dict[str, KnowledgeImport]) -> Dict[str, KnowledgeImport]:
knowledge_dir = files.get_abs_path(knowledge_dir)


Expand All @@ -49,7 +49,7 @@ def load_knowledge(knowledge_dir: str, index: Dict[str, KnowledgeImport]) -> Dic
kn_files = glob.glob(knowledge_dir + '/**/*', recursive=True)
if kn_files:
print(f"Found {len(kn_files)} knowledge files in {knowledge_dir}, processing...")
Log.log(type="info", content=f"Found {len(kn_files)} knowledge files in {knowledge_dir}, processing...")
logger.log(type="info", content=f"Found {len(kn_files)} knowledge files in {knowledge_dir}, processing...")

for file_path in kn_files:
ext = file_path.split('.')[-1].lower()
Expand Down Expand Up @@ -83,5 +83,5 @@ def load_knowledge(knowledge_dir: str, index: Dict[str, KnowledgeImport]) -> Dic
index[file_key]['state'] = 'removed'

print(f"Processed {cnt_docs} documents from {cnt_files} files.")
Log.log(type="info", content=f"Processed {cnt_docs} documents from {cnt_files} files.")
logger.log(type="info", content=f"Processed {cnt_docs} documents from {cnt_files} files.")
return index
108 changes: 64 additions & 44 deletions python/helpers/log.py
Original file line number Diff line number Diff line change
@@ -1,58 +1,78 @@
from dataclasses import dataclass
from dataclasses import dataclass, field
import json
from typing import Optional, Dict
import uuid


@dataclass
class LogItem:
log: 'Log'
no: int
type: str
heading: str
content: str
kvps: Optional[Dict] = None

guid: str = ""

def __post_init__(self):
self.guid = self.log.guid

def update(self, type: str | None = None, heading: str | None = None, content: str | None = None, kvps: dict | None = None):
if self.guid == self.log.guid:
self.log.update_item(self.no, type=type, heading=heading, content=content, kvps=kvps)

def output(self):
return {
"no": self.no,
"type": self.type,
"heading": self.heading,
"content": self.content,
"kvps": self.kvps
}

class Log:

guid = uuid.uuid4()
version: int = 0
last_updated: int = 0
logs: list = []

def __init__(self, type: str="placeholder", heading: str="", content: str="", kvps: dict|None = None):
self.item = Log.log(type, heading, content, kvps) # create placeholder log item that will be updated

def update(self, type: Optional[str] = None, heading: str|None = None, content: str|None = None, kvps: dict|None = None):
Log.edit(self.item.no, type=type, heading=heading, content=content, kvps=kvps)

@staticmethod
def reset():
Log.guid = uuid.uuid4()
Log.version = 0
Log.last_updated = 0
Log.logs = []

@staticmethod
def log(type: str, heading: str|None = None, content: str|None = None, kvps: dict|None = None):
item = LogItem(len(Log.logs), type, heading or "", content or "", kvps)
Log.logs.append(item)
Log.last_updated = item.no
Log.version += 1
def __init__(self):
self.guid: str = str(uuid.uuid4())
self.updates: list[int] = []
self.logs: list[LogItem] = []

def log(self, type: str, heading: str | None = None, content: str | None = None, kvps: dict | None = None) -> LogItem:
item = LogItem(log=self,no=len(self.logs), type=type, heading=heading or "", content=content or "", kvps=kvps)
self.logs.append(item)
self.updates += [item.no]
return item

@staticmethod
def edit(no: int, type: Optional[str] = None, heading: str|None = None, content: str|None = None, kvps: dict|None = None):
if 0 <= no < len(Log.logs):
item = Log.logs[no]
if type is not None:
item.type = type
if heading is not None:
item.heading = heading
if content is not None:
item.content = content
if kvps is not None:
item.kvps = kvps

Log.last_updated = no
Log.version += 1
else:
raise IndexError("Log item number out of range")

def update_item(self, no: int, type: str | None = None, heading: str | None = None, content: str | None = None, kvps: dict | None = None):
item = self.logs[no]
if type is not None:
item.type = type
if heading is not None:
item.heading = heading
if content is not None:
item.content = content
if kvps is not None:
item.kvps = kvps
self.updates += [item.no]

def output(self, start=None, end=None):
if start is None:
start = 0
if end is None:
end = len(self.updates)

out = []
seen = set()
for update in self.updates[start:end]:
if update not in seen:
out.append(self.logs[update].output())
seen.add(update)

return out



def reset(self):
self.guid = str(uuid.uuid4())
self.updates = []
self.logs = []
5 changes: 3 additions & 2 deletions python/helpers/rate_limiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ class CallRecord:
output_tokens: int = 0 # Default to 0, will be set separately

class RateLimiter:
def __init__(self, max_calls: int, max_input_tokens: int, max_output_tokens: int, window_seconds: int = 60):
def __init__(self, logger: Log, max_calls: int, max_input_tokens: int, max_output_tokens: int, window_seconds: int = 60):
self.logger = logger
self.max_calls = max_calls
self.max_input_tokens = max_input_tokens
self.max_output_tokens = max_output_tokens
Expand Down Expand Up @@ -49,7 +50,7 @@ def _wait_if_needed(self, current_time: float, new_input_tokens: int):
wait_time = oldest_record.timestamp + self.window_seconds - current_time
if wait_time > 0:
PrintStyle(font_color="yellow", padding=True).print(f"Rate limit exceeded. Waiting for {wait_time:.2f} seconds due to: {', '.join(wait_reasons)}")
Log.log("rate_limit","Rate limit exceeded",f"Rate limit exceeded. Waiting for {wait_time:.2f} seconds due to: {', '.join(wait_reasons)}")
self.logger.log("rate_limit","Rate limit exceeded",f"Rate limit exceeded. Waiting for {wait_time:.2f} seconds due to: {', '.join(wait_reasons)}")
time.sleep(wait_time)
current_time = time.time()

Expand Down
4 changes: 2 additions & 2 deletions python/helpers/shell_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def __init__(self):
self.process = None
self.full_output = ''

def connect(self):
async def connect(self):
# Start a new subprocess with the appropriate shell for the OS
if sys.platform.startswith('win'):
# Windows
Expand Down Expand Up @@ -44,7 +44,7 @@ def send_command(self, command: str):
self.process.stdin.write(command + '\n') # type: ignore
self.process.stdin.flush() # type: ignore

def read_output(self) -> Tuple[str, Optional[str]]:
async def read_output(self) -> Tuple[str, Optional[str]]:
if not self.process:
raise Exception("Shell not connected")

Expand Down
Loading

0 comments on commit 68ba6e9

Please sign in to comment.