-
Notifications
You must be signed in to change notification settings - Fork 2.8k
/
Copy pathstreamlit.py
1737 lines (1475 loc) · 71.6 KB
/
streamlit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import shutil
import json
import os
import streamlit as st
from subprocess import run, CalledProcessError
from dotenv import load_dotenv
import re
import time
import logging
from typing import Dict, List, Optional, Tuple
from datetime import datetime
import sys
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
# Create formatters
console_formatter = logging.Formatter(
'\033[92m%(asctime)s\033[0m - ' # Green timestamp
'\033[94m%(levelname)s\033[0m - ' # Blue level
'\033[95m[%(funcName)s]\033[0m ' # Purple function name
'%(message)s' # Regular message
)
file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - [%(funcName)s] %(message)s')
# Configure root logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# Clear any existing handlers
logger.handlers = []
# Console Handler
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(console_formatter)
console_handler.setLevel(logging.INFO)
logger.addHandler(console_handler)
# File Handler
log_dir = os.path.expanduser("~/.config/fabric/logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, f"fabric_ui_{datetime.now().strftime('%Y%m%d')}.log")
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(file_formatter)
file_handler.setLevel(logging.DEBUG) # More detailed logging in file
logger.addHandler(file_handler)
# Log startup message
logger.info("🚀 Fabric UI Starting Up")
logger.info(f"💾 Log file: {log_file}")
# Global variables
pattern_dir = os.path.expanduser("~/.config/fabric/patterns")
MAX_RETRIES = 3
RETRY_DELAY = 1 # seconds
def initialize_session_state():
"""Initialize necessary session state attributes.
Error handling:
- Ensures all required session state variables are initialized
- Loads saved outputs from persistent storage
- Handles missing or corrupted saved output files
"""
logger.info("Initializing session state")
default_configs = {
# Configuration state
"config_loaded": False,
"vendors": {},
"available_models": [],
"selected_vendor": None,
"selected_model": None,
# Pattern execution state
"input_content": "",
"selected_patterns": [],
"chat_output": [],
"current_view": "run",
# Pattern creation state
"wizard_step": "Basic Info",
"session_name": "",
"context_name": "",
# Model configuration
"config": {
"vendor": "",
"model": "",
"context_length": "2048"
},
# Model caching
"cached_models": None,
"last_model_fetch": 0,
# UI state
"active_tab": 0,
# Output management
"output_logs": [],
"starred_outputs": [],
"starring_output": None,
"temp_star_name": ""
}
for key, value in default_configs.items():
if key not in st.session_state:
st.session_state[key] = value
# Load saved outputs if they exist
load_saved_outputs()
def parse_models_output(output: str) -> Dict[str, List[str]]:
"""Parse the output of fabric --listmodels command."""
logger.debug("Parsing models output")
providers = {}
current_provider = None
lines = output.split('\n')
for line in lines:
line = line.strip()
if not line:
continue
if line == "Available models:":
continue
if not line.startswith('\t') and not line.startswith('['):
current_provider = line.strip()
providers[current_provider] = []
elif current_provider and (line.startswith('\t') or line.startswith('[')):
model = line.strip()
if '[' in model and ']' in model:
model = model.split(']', 1)[1].strip()
providers[current_provider].append(model)
logger.debug(f"Found providers: {list(providers.keys())}")
return providers
def safe_run_command(command: List[str], retry: bool = True) -> Tuple[bool, str, str]:
"""Safely run a command with retries."""
cmd_str = " ".join(command)
logger.info(f"Executing command: {cmd_str}")
for attempt in range(MAX_RETRIES if retry else 1):
try:
logger.debug(f"Attempt {attempt + 1}/{MAX_RETRIES if retry else 1}")
result = run(command, capture_output=True, text=True)
if result.returncode == 0:
logger.debug("Command executed successfully")
return True, result.stdout, ""
if attempt == MAX_RETRIES - 1 or not retry:
logger.error(f"Command failed with return code {result.returncode}: {result.stderr}")
return False, "", result.stderr
except Exception as e:
if attempt == MAX_RETRIES - 1 or not retry:
logger.error(f"Command execution failed: {str(e)}")
return False, "", str(e)
logger.debug(f"Retrying in {RETRY_DELAY} seconds...")
time.sleep(RETRY_DELAY)
logger.error("Max retries exceeded")
return False, "", "Max retries exceeded"
def fetch_models_once() -> Dict[str, List[str]]:
"""Fetch models once and cache the results."""
logger.info("Fetching models")
current_time = time.time()
cache_timeout = 300 # 5 minutes
if (st.session_state.cached_models is not None and
current_time - st.session_state.last_model_fetch < cache_timeout):
logger.debug("Using cached models")
return st.session_state.cached_models
logger.debug("Cache expired or not available, fetching new models")
success, stdout, stderr = safe_run_command(["fabric", "--listmodels"])
if not success:
logger.error(f"Failed to fetch models: {stderr}")
st.error(f"Failed to fetch models: {stderr}")
return {}
providers = parse_models_output(stdout)
logger.info(f"Found {len(providers)} providers")
st.session_state.cached_models = providers
st.session_state.last_model_fetch = current_time
return providers
def get_configured_providers() -> Dict[str, List[str]]:
"""Get list of configured providers using fabric --listmodels."""
return fetch_models_once()
def update_provider_selection(new_provider: str) -> None:
"""Update provider and reset related states."""
logger.info(f"Updating provider selection to: {new_provider}")
if new_provider != st.session_state.config["vendor"]:
logger.debug("Provider changed, resetting model selection")
st.session_state.config["vendor"] = new_provider
st.session_state.selected_vendor = new_provider
st.session_state.config["model"] = None
st.session_state.selected_model = None
st.session_state.available_models = []
if "model_select" in st.session_state:
del st.session_state.model_select
logger.debug("Model state reset completed")
def load_configuration() -> bool:
"""Load environment variables and initialize configuration."""
logger.info("Loading configuration")
try:
env_path = os.path.expanduser("~/.config/fabric/.env")
logger.debug(f"Looking for .env file at: {env_path}")
if not os.path.exists(env_path):
logger.error(f"Configuration file not found at {env_path}")
st.error(f"Configuration file not found at {env_path}")
return False
load_dotenv(dotenv_path=env_path)
logger.debug("Environment variables loaded")
with st.spinner("Loading providers and models..."):
providers = get_configured_providers()
if not providers:
logger.error("No providers configured")
st.error("No providers configured. Please run 'fabric --setup' first.")
return False
default_vendor = os.getenv("DEFAULT_VENDOR")
default_model = os.getenv("DEFAULT_MODEL")
context_length = os.getenv("DEFAULT_MODEL_CONTEXT_LENGTH", "2048")
logger.debug(f"Default configuration - Vendor: {default_vendor}, Model: {default_model}")
if not default_vendor or default_vendor not in providers:
default_vendor = next(iter(providers))
default_model = providers[default_vendor][0] if providers[default_vendor] else None
logger.info(f"Using fallback configuration - Vendor: {default_vendor}, Model: {default_model}")
st.session_state.config = {
"vendor": default_vendor,
"model": default_model,
"context_length": context_length
}
st.session_state.vendors = providers
st.session_state.config_loaded = True
logger.info("Configuration loaded successfully")
return True
except Exception as e:
logger.error(f"Configuration error: {str(e)}", exc_info=True)
st.error(f"Configuration error: {str(e)}")
return False
def load_models_and_providers() -> None:
"""Load models and providers from fabric configuration."""
try:
st.sidebar.header("Model and Provider Selection")
providers: Dict[str, List[str]] = fetch_models_once()
if not providers:
st.sidebar.error("No providers configured")
return
current_vendor = st.session_state.config.get("vendor", "")
available_providers = list(providers.keys())
try:
provider_index = available_providers.index(current_vendor) if current_vendor in available_providers else 0
except ValueError:
provider_index = 0
logger.warning(f"Current vendor {current_vendor} not found in available providers")
selected_provider = st.sidebar.selectbox(
"Provider",
available_providers,
index=provider_index,
key="provider_select",
on_change=lambda: update_provider_selection(st.session_state.provider_select)
)
if selected_provider != st.session_state.config.get("vendor"):
update_provider_selection(selected_provider)
st.sidebar.success(f"Using {selected_provider}")
available_models = providers.get(selected_provider, [])
if not available_models:
st.sidebar.warning(f"No models available for {selected_provider}")
return
current_model = st.session_state.config.get("model")
try:
model_index = available_models.index(current_model) if current_model in available_models else 0
except ValueError:
model_index = 0
logger.warning(f"Current model {current_model} not found in available models for {selected_provider}")
model_key = f"model_select_{selected_provider}"
selected_model = st.sidebar.selectbox(
"Model",
available_models,
index=model_index,
key=model_key
)
if selected_model != st.session_state.config.get("model"):
logger.debug(f"Updating model selection to: {selected_model}")
st.session_state.config["model"] = selected_model
st.session_state.selected_model = selected_model
except Exception as e:
logger.error(f"Error loading models and providers: {str(e)}", exc_info=True)
st.sidebar.error(f"Error loading models and providers: {str(e)}")
st.session_state.selected_model = None
st.session_state.config["model"] = None
def get_pattern_metadata(pattern_name):
"""Get pattern metadata from system.md."""
pattern_path = os.path.join(pattern_dir, pattern_name, "system.md")
if os.path.exists(pattern_path):
with open(pattern_path, "r") as f:
return f.read()
return None
def get_patterns():
"""Get the list of available patterns from the specified directory."""
if not os.path.exists(pattern_dir):
st.error(f"Pattern directory not found: {pattern_dir}")
return []
try:
patterns = [item for item in os.listdir(pattern_dir)
if os.path.isdir(os.path.join(pattern_dir, item))]
return patterns
except PermissionError:
st.error(f"Permission error accessing pattern directory: {pattern_dir}")
return []
except Exception as e:
st.error(f"An unexpected error occurred: {e}")
return []
def create_pattern(pattern_name: str, content: Optional[str] = None) -> Tuple[bool, str]:
"""Create a new pattern with necessary files and structure."""
new_pattern_path = None
try:
# Validate pattern name
if not pattern_name:
logger.error("Pattern name cannot be empty")
return False, "Pattern name cannot be empty."
# Check if pattern already exists
new_pattern_path = os.path.join(pattern_dir, pattern_name)
if os.path.exists(new_pattern_path):
logger.error(f"Pattern {pattern_name} already exists")
return False, "Pattern already exists."
# Create pattern directory
os.makedirs(new_pattern_path)
logger.info(f"Created pattern directory: {new_pattern_path}")
# If content is provided, use fabric create_pattern to structure it
if content:
logger.info(f"Structuring content for pattern '{pattern_name}' using Fabric")
try:
# Get current model and provider configuration
current_provider = st.session_state.config.get("vendor")
current_model = st.session_state.config.get("model")
if not current_provider or not current_model:
raise ValueError("Please select a provider and model first.")
# Execute fabric create_pattern with input content
cmd = ["fabric", "--pattern", "create_pattern"]
if current_provider and current_model:
cmd.extend(["--vendor", current_provider, "--model", current_model])
logger.debug(f"Running command: {' '.join(cmd)}")
logger.debug(f"Input content:\n{content}")
# Execute pattern
result = run(cmd, input=content, capture_output=True, text=True, check=True)
structured_content = result.stdout.strip()
if not structured_content:
raise ValueError("No output received from create_pattern")
# Save the structured content to system.md
system_file = os.path.join(new_pattern_path, "system.md")
with open(system_file, "w") as f:
f.write(structured_content)
# Validate the created pattern
is_valid, validation_message = validate_pattern(pattern_name)
if not is_valid:
raise ValueError(f"Pattern validation failed: {validation_message}")
logger.info(f"Successfully created pattern '{pattern_name}' with structured content")
except CalledProcessError as e:
error_msg = f"Error running create_pattern: {e.stderr}"
logger.error(error_msg)
if os.path.exists(new_pattern_path):
shutil.rmtree(new_pattern_path)
return False, error_msg
except Exception as e:
error_msg = f"Unexpected error during content structuring: {str(e)}"
logger.error(error_msg)
if os.path.exists(new_pattern_path):
shutil.rmtree(new_pattern_path)
return False, error_msg
else:
# Create minimal template for manual editing
logger.info(f"Creating minimal template for pattern '{pattern_name}'")
system_file = os.path.join(new_pattern_path, "system.md")
with open(system_file, "w") as f:
f.write("# IDENTITY and PURPOSE\n\n# STEPS\n\n# OUTPUT INSTRUCTIONS\n")
# Validate the created pattern
is_valid, validation_message = validate_pattern(pattern_name)
if not is_valid:
logger.warning(f"Pattern created but validation failed: {validation_message}")
return True, f"Pattern '{pattern_name}' created successfully."
except Exception as e:
error_msg = f"Error creating pattern: {str(e)}"
logger.error(error_msg)
# Clean up on any error
if new_pattern_path and os.path.exists(new_pattern_path):
shutil.rmtree(new_pattern_path)
return False, error_msg
def delete_pattern(pattern_name):
"""Delete an existing pattern."""
try:
if not pattern_name:
return False, "Pattern name cannot be empty."
pattern_path = os.path.join(pattern_dir, pattern_name)
if not os.path.exists(pattern_path):
return False, "Pattern does not exist."
shutil.rmtree(pattern_path)
return True, f"Pattern '{pattern_name}' deleted successfully."
except Exception as e:
return False, f"Error deleting pattern: {str(e)}"
def pattern_creation_wizard():
"""Multi-step wizard for creating a new pattern."""
st.header("Create New Pattern")
pattern_name = st.text_input("Pattern Name")
if pattern_name:
edit_mode = st.radio(
"Edit Mode",
["Simple Editor", "Advanced (Wizard)"],
key="pattern_creation_edit_mode",
horizontal=True
)
if edit_mode == "Simple Editor":
new_content = st.text_area("Enter Pattern Content", height=400)
if st.button("Create Pattern", type="primary"):
success, message = create_pattern(pattern_name, new_content)
if success:
st.success(message)
st.experimental_rerun()
else:
st.error(message)
else:
sections = ["IDENTITY", "GOAL", "OUTPUT", "OUTPUT INSTRUCTIONS"]
current_section = st.radio(
"Edit Section",
sections,
key="pattern_creation_section_select"
)
if current_section == "IDENTITY":
identity = st.text_area("Define the IDENTITY", height=200)
st.session_state.new_pattern_identity = identity
elif current_section == "GOAL":
goal = st.text_area("Define the GOAL", height=200)
st.session_state.new_pattern_goal = goal
elif current_section == "OUTPUT":
output = st.text_area("Define the OUTPUT", height=200)
st.session_state.new_pattern_output = output
elif current_section == "OUTPUT INSTRUCTIONS":
instructions = st.text_area("Define the OUTPUT INSTRUCTIONS", height=200)
st.session_state.new_pattern_instructions = instructions
pattern_content = f"""# IDENTITY
{st.session_state.get('new_pattern_identity', '')}
# GOAL
{st.session_state.get('new_pattern_goal', '')}
# OUTPUT
{st.session_state.get('new_pattern_output', '')}
# OUTPUT INSTRUCTIONS
{st.session_state.get('new_pattern_instructions', '')}"""
if st.button("Create Pattern", type="primary"):
success, message = create_pattern(pattern_name, pattern_content)
if success:
st.success(message)
for key in ["new_pattern_identity", "new_pattern_goal", "new_pattern_output", "new_pattern_instructions"]:
if key in st.session_state:
del st.session_state[key]
st.experimental_rerun()
else:
st.error(message)
else:
st.info("Enter a pattern name to create a new pattern")
def bulk_edit_patterns(patterns_to_edit, field_to_update, new_value):
"""Perform bulk edits on multiple patterns."""
results = []
for pattern in patterns_to_edit:
try:
pattern_path = os.path.join(pattern_dir, pattern)
system_file = os.path.join(pattern_path, "system.md")
if not os.path.exists(system_file):
results.append((pattern, False, "system.md not found"))
continue
with open(system_file, "r") as f:
content = f.read()
if field_to_update == "purpose":
sections = content.split("#")
updated_sections = []
for section in sections:
if section.strip().startswith("IDENTITY and PURPOSE"):
lines = section.split("\n")
for i, line in enumerate(lines):
if "You are an AI assistant designed to" in line:
lines[i] = f"You are an AI assistant designed to {new_value}."
updated_sections.append("\n".join(lines))
else:
updated_sections.append(section)
new_content = "#".join(updated_sections)
with open(system_file, "w") as f:
f.write(new_content)
results.append((pattern, True, "Updated successfully"))
else:
results.append((pattern, False, f"Field {field_to_update} not supported for bulk edit"))
except Exception as e:
results.append((pattern, False, str(e)))
return results
def pattern_creation_ui():
"""UI component for creating patterns with simple and wizard modes."""
pattern_name = st.text_input("Pattern Name")
if not pattern_name:
st.info("Enter a pattern name to create a new pattern")
return
system_content = """# IDENTITY and PURPOSE
You are an AI assistant designed to {purpose}.
# STEPS
- Step 1
- Step 2
- Step 3
# OUTPUT INSTRUCTIONS
- Output format instructions here
"""
new_content = st.text_area("Edit Pattern Content", system_content, height=400)
if st.button("Create Pattern", type="primary"):
if not pattern_name:
st.error("Pattern name cannot be empty.")
else:
success, message = create_pattern(pattern_name)
if success:
system_file = os.path.join(pattern_dir, pattern_name, "system.md")
with open(system_file, "w") as f:
f.write(new_content)
st.success(f"Pattern '{pattern_name}' created successfully!")
st.experimental_rerun()
else:
st.error(message)
def pattern_management_ui():
"""UI component for pattern management."""
st.sidebar.title("Pattern Management")
def save_output_log(pattern_name: str, input_content: str, output_content: str, timestamp: str):
"""Save pattern execution log."""
log_entry = {
"timestamp": timestamp,
"pattern_name": pattern_name,
"input": input_content,
"output": output_content,
"is_starred": False,
"custom_name": ""
}
st.session_state.output_logs.append(log_entry)
# Save outputs after each new log entry
save_outputs()
def star_output(log_index: int, custom_name: str = "") -> bool:
"""Star/favorite an output log.
Args:
log_index: Index of the output log to star
custom_name: Optional custom name for the starred output
Returns:
bool: True if output was starred successfully, False otherwise
"""
try:
if 0 <= log_index < len(st.session_state.output_logs):
log_entry = st.session_state.output_logs[log_index].copy()
log_entry["is_starred"] = True
log_entry["custom_name"] = custom_name or f"Starred Output #{len(st.session_state.starred_outputs) + 1}"
# Check if this output is already starred (by timestamp)
if not any(s["timestamp"] == log_entry["timestamp"] for s in st.session_state.starred_outputs):
st.session_state.starred_outputs.append(log_entry)
save_outputs() # Save after starring
return True
return False
except Exception as e:
logger.error(f"Error starring output: {str(e)}")
return False
def unstar_output(log_index: int):
"""Remove an output from starred/favorites."""
if 0 <= log_index < len(st.session_state.starred_outputs):
st.session_state.starred_outputs.pop(log_index)
# Save outputs after unstarring
save_outputs()
def validate_input_content(input_text: str) -> Tuple[bool, str]:
"""Validate input content for potentially problematic characters or patterns.
Args:
input_text: The input text to validate
Returns:
Tuple[bool, str]: (is_valid, error_message)
"""
if not input_text or input_text.isspace():
return False, "Input content cannot be empty or only whitespace."
# Check for minimum length
if len(input_text.strip()) < 2:
return False, "Input content must be at least 2 characters long."
# Check for maximum length (e.g., 100KB)
if len(input_text.encode('utf-8')) > 100 * 1024:
return False, "Input content exceeds maximum size of 100KB."
# Check for high concentration of special characters
special_chars = set('!@#$%^&*()_+[]{}|\\;:\'",.<>?`~')
special_char_count = sum(1 for c in input_text if c in special_chars)
special_char_ratio = special_char_count / len(input_text)
if special_char_ratio > 0.3: # More than 30% special characters
return False, "Input contains too many special characters. Please check your input."
# Check for control characters
control_chars = set(chr(i) for i in range(32) if i not in [9, 10, 13]) # Allow tab, newline, carriage return
if any(c in control_chars for c in input_text):
return False, "Input contains invalid control characters."
# Check for proper UTF-8 encoding
try:
input_text.encode('utf-8').decode('utf-8')
except UnicodeError:
return False, "Input contains invalid Unicode characters."
return True, ""
def sanitize_input_content(input_text: str) -> str:
"""Sanitize input content by removing or replacing problematic characters.
Args:
input_text: The input text to sanitize
Returns:
str: Sanitized input text
"""
# Remove null bytes
text = input_text.replace('\0', '')
# Replace control characters with spaces (except newlines and tabs)
allowed_chars = {'\n', '\t', '\r'}
sanitized_chars = []
for c in text:
if c in allowed_chars or ord(c) >= 32:
sanitized_chars.append(c)
else:
sanitized_chars.append(' ')
# Join characters and normalize whitespace
text = ''.join(sanitized_chars)
text = ' '.join(text.split())
return text
def execute_patterns(patterns_to_run: List[str], chain_mode: bool = False, initial_input: Optional[str] = None) -> List[str]:
"""Execute the selected patterns and capture their outputs."""
logger.info(f"Executing {len(patterns_to_run)} patterns")
st.session_state.chat_output = []
all_outputs = []
current_input = initial_input or st.session_state.input_content
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Validate configuration
current_provider = st.session_state.config.get("vendor")
current_model = st.session_state.config.get("model")
if not current_provider or not current_model:
error_msg = "Please select a provider and model first."
logger.error(error_msg)
st.error(error_msg)
return all_outputs
# Validate input content
is_valid, error_message = validate_input_content(current_input)
if not is_valid:
logger.error(f"Input validation failed: {error_message}")
st.error(f"Input validation failed: {error_message}")
return all_outputs
# Sanitize input content
try:
sanitized_input = sanitize_input_content(current_input)
if sanitized_input != current_input:
logger.info("Input content was sanitized")
st.warning("Input content was automatically sanitized for better compatibility.")
current_input = sanitized_input
except Exception as e:
logger.error(f"Error sanitizing input: {str(e)}")
st.error(f"Error processing input: {str(e)}")
return all_outputs
execution_info = f"**Using Model:** {current_provider} - {current_model}"
all_outputs.append(execution_info)
logger.info(f"Using model: {current_model} from provider: {current_provider}")
try:
for pattern in patterns_to_run:
logger.info(f"Running pattern: {pattern}")
try:
cmd = ["fabric", "--pattern", pattern]
logger.debug(f"Executing command: {' '.join(cmd)}")
message = current_input if chain_mode else st.session_state.input_content
logger.debug(f"Input for pattern {pattern}:\n{message}")
# Ensure input_data is a string
input_data = str(message)
# Run the command with text=True and string input
result = run(
cmd,
input=input_data,
capture_output=True,
text=True,
check=True
)
pattern_output = result.stdout.strip()
logger.debug(f"Raw output from pattern {pattern}:\n{pattern_output}")
if pattern_output:
# Format output as markdown
output_msg = f"""### {pattern}
{pattern_output}"""
all_outputs.append(output_msg)
# Save to output logs with markdown formatting
save_output_log(pattern, message, pattern_output, timestamp)
if chain_mode:
current_input = pattern_output
else:
logger.warning(f"Pattern {pattern} generated no output")
all_outputs.append(f"### {pattern}\n\nNo output generated.")
except UnicodeEncodeError as e:
error_msg = f"### {pattern}\n\n❌ Error: Input contains invalid characters: {str(e)}"
logger.error(f"Unicode encoding error for pattern {pattern}: {str(e)}")
all_outputs.append(error_msg)
if chain_mode:
break
except CalledProcessError as e:
error_msg = f"### {pattern}\n\n❌ Error executing: {e.stderr.strip()}"
logger.error(f"Pattern {pattern} failed: {e.stderr.strip()}")
all_outputs.append(error_msg)
if chain_mode:
break
except Exception as e:
error_msg = f"### {pattern}\n\n❌ Failed to execute: {str(e)}"
logger.error(f"Pattern {pattern} failed: {str(e)}", exc_info=True)
all_outputs.append(error_msg)
if chain_mode:
break
except Exception as e:
error_msg = f"### Error\n\n❌ Error in pattern execution: {str(e)}"
logger.error(error_msg, exc_info=True)
st.error(error_msg)
logger.info("Pattern execution completed")
return all_outputs
def validate_pattern(pattern_name):
"""Validate a pattern's structure and content."""
try:
pattern_path = os.path.join(pattern_dir, pattern_name)
if not os.path.exists(os.path.join(pattern_path, "system.md")):
return False, f"Missing required file: system.md."
with open(os.path.join(pattern_path, "system.md")) as f:
content = f.read()
required_sections = [
"# IDENTITY",
"# STEPS",
"# OUTPUT"
]
missing_sections = []
for section in required_sections:
if section.lower() not in content.lower():
missing_sections.append(section)
if missing_sections:
return True, f"Warning: Missing sections in system.md: {', '.join(missing_sections)}"
return True, "Pattern is valid."
except Exception as e:
return False, f"Error validating pattern: {str(e)}"
def pattern_editor(pattern_name):
"""Edit pattern content with simple and advanced editing options."""
if not pattern_name:
return
pattern_path = os.path.join(pattern_dir, pattern_name)
system_file = os.path.join(pattern_path, "system.md")
user_file = os.path.join(pattern_path, "user.md")
st.markdown(f"### Editing Pattern: {pattern_name}")
is_valid, message = validate_pattern(pattern_name)
if not is_valid:
st.error(message)
elif message != "Pattern is valid.":
st.warning(message)
else:
st.success("Pattern structure is valid")
edit_mode = st.radio(
"Edit Mode",
["Simple Editor", "Advanced (Wizard)"],
key=f"edit_mode_{pattern_name}",
horizontal=True
)
if edit_mode == "Simple Editor":
if os.path.exists(system_file):
with open(system_file) as f:
content = f.read()
new_content = st.text_area("Edit system.md", content, height=600)
if st.button("Save system.md"):
with open(system_file, "w") as f:
f.write(new_content)
st.success("Saved successfully!")
else:
st.error("system.md file not found")
if os.path.exists(user_file):
with open(user_file) as f:
content = f.read()
new_content = st.text_area("Edit user.md", content, height=300)
if st.button("Save user.md"):
with open(user_file, "w") as f:
f.write(new_content)
st.success("Saved successfully!")
else:
if os.path.exists(system_file):
with open(system_file) as f:
content = f.read()
sections = content.split("#")
edited_sections = []
for section in sections:
if not section.strip():
continue
lines = section.strip().split("\n", 1)
if len(lines) > 1:
title, content = lines
else:
title, content = lines[0], ""
st.markdown(f"#### {title}")
new_content = st.text_area(
f"Edit {title} section",
value=content.strip(),
height=200,
key=f"section_{title}"
)
edited_sections.append(f"# {title}\n\n{new_content}")
if st.button("Save Changes"):
new_content = "\n\n".join(edited_sections)
with open(system_file, "w") as f:
f.write(new_content)
st.success("Changes saved successfully!")
is_valid, message = validate_pattern(pattern_name)
if not is_valid:
st.error(message)
elif message != "Pattern is valid.":
st.warning(message)
else:
st.error("system.md file not found")
def get_outputs_dir() -> str:
"""Get the directory for storing outputs."""
outputs_dir = os.path.expanduser("~/.config/fabric/outputs")
os.makedirs(outputs_dir, exist_ok=True)
return outputs_dir
def save_outputs():
"""Save pattern outputs and starred outputs to files.
Error handling:
- Creates output directory if it doesn't exist
- Handles file write permissions
- Handles JSON serialization errors
- Logs all errors for debugging
"""
logger.info("Saving outputs to persistent storage")
outputs_dir = get_outputs_dir()
output_logs_file = os.path.join(outputs_dir, "output_logs.json")
starred_outputs_file = os.path.join(outputs_dir, "starred_outputs.json")
try:
# Save output logs
with open(output_logs_file, "w") as f:
json.dump(st.session_state.output_logs, f, indent=2)
logger.debug(f"Saved output logs to {output_logs_file}")
# Save starred outputs
with open(starred_outputs_file, "w") as f:
json.dump(st.session_state.starred_outputs, f, indent=2)
logger.debug(f"Saved starred outputs to {starred_outputs_file}")
except PermissionError as e:
error_msg = f"Permission denied when saving outputs: {str(e)}"
logger.error(error_msg)
st.error(error_msg)
except json.JSONEncodeError as e:
error_msg = f"Error encoding outputs to JSON: {str(e)}"
logger.error(error_msg)
st.error(error_msg)
except Exception as e:
error_msg = f"Unexpected error saving outputs: {str(e)}"
logger.error(error_msg)
st.error(error_msg)
def load_saved_outputs():
"""Load saved pattern outputs from files.
Error handling:
- Handles missing output files
- Handles corrupted JSON files
- Handles file read permissions
- Initializes empty state if files don't exist
"""
logger.info("Loading saved outputs")