-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathconvert.py
1869 lines (1584 loc) · 71.9 KB
/
convert.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# A script for refactoring a Verilog module, then converting it to TL-Verilog.
# The refactoring steps are performed by an LLM such as ChatGPT-4 via its API.
# Manual refactoring is also possible. All refactoring steps are formally verified using SymbiYosys.
# Usage:
# python3 convert.py
# This begins or continues the conversion process for the only *.v file in the current directory.
# This script works with these files:
# - <module_name>_orig.v: The trusted Verilog module to convert. This is the original file for the current conversion step.
# - <module_name>.v: The current WIP refactored/modified Verilog module, against which FEV will be run.
# - prompt_id.txt: A file containing, e.g. {"id": 5, "desc": "Update clocks"}, the ID and desc field of the current prompt.
# (Formerly, this was just an ID number.) (Note, the actual prompt may have been modified manually.)
# - messages.json: The messages to be sent to the LLM API (as in the ChatGPT API).
# - chkpt.v: A link to the last checkpointed Verilog file.
# Additionally, these files may be created and captured in the process:
# - tmp/fev.sby & tmp/fev.eqy: The FEV script for this conversion job.
# - tmp/m5/*: Temporary files used for M5 preprocessing of a prompt.
# - tmp/pre_llm.v: The Verilog file sent to the LLM API.
# - tmp/llm_resp.v: The Verilog response field from the LLM (if any).
# - tmp/diff.v: The diff of pre_llm.v and llm_resp.v (if response contained Verilog field).
# - tmp/diff_mod.v: A modification of diff.v to ignore "..." diffs (if response contained Verilog field).
# - tmp/llm_upd.v: The updated Verilog file after applying diff_mod.v (if response contained Verilog field).
# - tmp/llm.v: The updated (or not) Verilog (llm_upd.v or pre_llm.v).
# - llm_response.txt: The LLM response file.
#
# A history of all refactoring steps is stored in history/#, where "#" is the "refactoring step", starting with history/1.
# This directory is initialized when the step is begun, and fully populated when the refactoring change is accepted.
# Contents includes:
# - history/#/prompt_id.txt: As above
# - history/#/<module_name>.v: The refactored file at each step.
# - history/#/messages.json: The messages sent to the LLM API for each step.
# Although Git naturally captures a history, it may be desirable to capture this folder in Git, simply for convenience, since it may be desirable to
# easily examine file differences or to rework the conversion steps after the fact.
#
# Each refactoring step may involve a number of individual code modifications, recorded in a modification history within the refactoring step directory.
# Each modification is captured, whether accepted, rejected, or reverted.
#
# A modification is stored in history/#/mod_#/ (where # are sequential numbers).
# Contents include:
# - history/#/mod_#/<module_name>.v: The modified Verilog file.
# - history/#/mod_#/messages.json: The messages sent to the LLM API (for LLM modifications only).
# - history/#/mod_#/status.json: Metadata about the modification, as below, written after testing.
#
# history/#/mod_0 are checkpoints of the initial code for each refactoring step. Thus, history/1/mod_0/<module_name>.v is the initial
# code for the entire conversion.
#
# history/#/mod_# can also be a symlink to a prior history/#/mod_#, recording a code reversion. A reversion will not reference
# another reversion.
#
# The status.json file reflects the status of the modification, updated as fields become known:
# {
# "by": "human"|"llm",
# "compile": "passed"|"failed" (or non-existent if not compiled),
# "fev": "passed"|"failed" (or non-existent if not run),
# "incomplete": true|false A sticky field (held for each checkpoint of the refactoring step) assigned or updated by each LLM run,
# indicating whether the LLM response was incomplete.
# "accepted": true|non-existent Exists as true for the final modification of a refactoring step that was accepted.
# }
#
# With each rejected refactoring step, a new candidate is captured under a new candidate number under the next history number directory.
#
# <repo>/prompts.json contains the default prompts used for refactoring steps as a JSON array of objects with the following fields:
# - desc: a brief description of the refactoring step
# - backgroud: (opt) background information that may be relevant to this refactoring step
# - prompt: prompt string, preprocess using M5 (if necessary), passing M5 variables from sticky fields
# - must_produce: (opt) an array of strings representing sticky fields that the LLM must produce in its response
# - may_produce: (opt) an array of strings representing sticky fields that the LLM may produce in its response.
# - if: (opt) an object with fields that represent values of sticky fields; if given and any match, this prompt will be used ("" matches undefined)
# Each field may have an array value rather than a string, in which case any array value may match.
# - unless: (opt) an object with fields that represent values of sticky fields; if given, unless all match, this prompt will be used ("" matches undefined)
# Each field may have an array value rather than a string, in which case any array value may match.
# - needs: (opt) an array of strings representing sticky fields whose values are to be reported in the prompt
# - consumes: (opt) an array of strings representing sticky fields that are consumed by this prompt
#
# When launched, this script first determines the current state of the conversions process. This state is:
# - The current candidate:
# - The current refactoring step, which is the latest history/#.
# - The next candidate number, which is the next history/#/mod_#
# - The next prompt ID, which is the ID of the prompt for the current refactoring step. This is the next prompt ID following the
# most recent that can be found in history/#/.
# Note that history/#/mod_#/ can be traced backward to determine what has been done so far.
#
# This is a command-line utility which prompts the user for input. Edits to <module_name>.v can be made manually.
# It is suggested to have <module_name>.v open in an editor and in a diff utility, such as meld (prompted and launched by this script), while running this script.
# Additionally, messages.json (and the "plan" field of status.json) may be modified manually before running the LLM. Users
# must be careful to save files before responding to prompts.
#
# To begin each step, the user is given instructions and prompted for input.
# The user makes edits and enters commands until a candidate is accepted or rejected, and the process repeats.
import os
import subprocess
from openai import OpenAI
import sys
import termios
import tty
import atexit
import signal
from select import select
from abc import ABC, abstractmethod
import json
import re
import shutil
import stat
# Confirm that we're using Python 3.7 or later (as we rely on dictionaries to be ordered).
if sys.version_info < (3, 7):
print("Error: This script requires Python 3.7 or later.")
sys.exit(1)
#############
# #
# Classes #
# #
#############
# Lock a file to prevent writing, as:
# with FileLocked("file.txt"):
# # Do stuff with file.txt
class FileLocked:
def __init__(self, filepath):
self.filepath = filepath
self.original_permissions = None
def __enter__(self):
# Get the current permissions
self.original_permissions = os.stat(self.filepath).st_mode
# Remove write permissions (make file read-only)
os.chmod(self.filepath, self.original_permissions & ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH))
return self
def __exit__(self, exc_type, exc_value, traceback):
# Restore original permissions
os.chmod(self.filepath, self.original_permissions)
# Abstract Base Class for LLM API.
class LLM_API(ABC):
name = "LLM"
model = None
def __init__(self):
pass
def setDefaultModel(self, model):
self.validateModel(model)
self.model = model
def validateModel(self, model):
print("Error: Model " + model + " not found.")
fail()
# Run the LLM API on the prompt file, producing a (TL-)Verilog file.
@abstractmethod
def run(self, messages, verilog, model):
pass
# A class responsible for bundling messages objects into text and visa versa.
# This class isolates the format of LLM messages from the functionality and enables message formats to be used
# that are optimized for the LLM.
class MessageBundler:
# TODO: Unused. Was this being added, or never needed?
# Convert the given object to text.
# The object format is:
# {
# "desc": "This is a description.",
# "background": (optional) "This is background information.",
# "prompt": "This is a prompt.\n\nIt has multiple lines."
# }
@abstractmethod
def obj_to_content(self, json):
pass
# TODO: Unused. Was this being added, or never used?
# Convert the given LLM response text into an object of the form:
# {
# "overview": "This is an overview.",
# "verilog": "This is the Verilog code.",
# "notes": "These are notes.",
# "issues": "These are issues.",
# "incomplete": true,
# "plan": "Since changes are incomplete, this is the plan for completing the step."
# }
@abstractmethod
def content_to_obj(self, content):
pass
# Add Verilog to last message to be sent to the API.
# messages: The messages.json object in OpenAI format.
# verilog: The current Verilog file contents.
@abstractmethod
def add_verilog(self, messages, verilog):
pass
# An LLM API class for OpenAI's ChatGPT API.
class OpenAI_API(LLM_API):
name = "OpenAI"
model = "gpt-3.5-turbo" # default model (can be overridden in run(..))
# Reasoning models: "o1-preview", "o1-mini"
def __init__(self):
super().__init__()
# if OPENAI_API_KEY env var does not exist, get it from ~/.openai/key.txt or input prompt.
if not os.getenv("OPENAI_API_KEY"):
key_file_name = os.path.expanduser("~/.openai/key.txt")
if os.path.exists(key_file_name):
with open(key_file_name) as file:
os.environ["OPENAI_API_KEY"] = file.read()
else:
os.environ["OPENAI_API_KEY"] = input("Enter your OpenAI API key: ")
# Use an organization in the request if one is provided, either in the OPENAI_ORG_ID env var or in ~/.openai/org_id.txt.
self.org_id = os.getenv("OPENAI_ORG_ID")
if not self.org_id:
org_file_name = os.path.expanduser("~/.openai/org_id.txt")
if os.path.exists(org_file_name):
with open(org_file_name) as file:
self.org_id = file.read()
# Init OpenAI.
self.client = OpenAI() if self.org_id is None else OpenAI(organization=self.org_id)
models = self.client.models
self.models = self.client.models.list()
def validateModel(self, model):
# Get the data for the model (or None if not found)
model_data = next((item for item in self.models.data if hasattr(item, 'id') and item.id == model), None)
if model_data is None:
print("Error: Model " + model + " not found.")
fail()
# Set up the initial messages object for the current refactoring step based on the given system message and message parameter.
def initPrompt(self, system, message):
return [
{"role": "system", "content": system},
{"role": "user", "content": message}
]
# Run the LLM API on the messages.json file appended with the verilog code, returning the response string from the LLM.
def run(self, messages, verilog, model=None):
if model == None:
model = self.model
self.validateModel(model)
# Add verilog to the last message.
message_bundler.add_verilog(messages, verilog)
# Call the API.
print("\nCalling " + model + "...")
# TODO: Not supported in ChatGPT-3.5: response_format = {"type": "json_object"}
# TODO: o1-preview model reports: {'message': "Unsupported value: 'messages[0].role' does not support 'system' with this model.", 'type': 'invalid_request_error', 'param': 'messages[0].role', 'code': 'unsupported_value'"}
api_response = self.client.chat.completions.create(model=model, messages=messages, max_tokens=3000, temperature=0.0)
print("Response received from " + model)
# Parse the response.
try:
response_str = api_response.choices[0].message.content
finish_reason = api_response.choices[0].finish_reason
completion_tokens = api_response.usage.completion_tokens
print("API response finish reason: " + finish_reason)
print("API response completion tokens: " + str(completion_tokens))
except Exception as e:
print("Error: API response is invalid.")
print(str(e))
fail()
return response_str
# A message bundler that converts messages to and from the pseudo-Markdown format used in LLM messages.
class PseudoMarkdownMessageBundler(MessageBundler):
# Convert the given object to a pseudo-Markdown format. Markdown syntax is familiar to the LLM, and fields can be
# provided without any awkward escaping and other formatting, as described in default_system_message.txt.
# Example JSON:
# {"prompt": "Do this...", "verilog": "module...\nendmodule"}
# Example output:
# ## prompt
#
# Do this...
#
# ## verilog
#
# module...
# endmodule
def obj_to_request(self, obj):
content = ""
separator = ""
for key in obj:
# Convert (single-word) key to title case.
name = key[0].upper() + key[1:]
content += separator + "## " + name + "\n\n" + obj[key]
separator = "\n\n"
return content
"""
# TODO: Maybe this notion of sections should be replaced with an option for responses to
# use "\n...\n" to omit portions of code. Yes... do this!
#
# Split a Verilog file into sections delimited by "// LLM: [Omitted ]Section: <name>"
# (as described in default_system_message.txt).
# body: The Verilog code from a "verilog" field of an LLM request or response.
# response: A boolean indicating whether the body is a response (vs. request).
def split_sections(self, body, response):
# Match sections, delimited by "// LLM: Section: <name>".
sections = re.split(r"\/\/ LLM:\s*(Omitted)?\s*Section:\s*([^\n]+)\n", body)
# Give the first section a name if it is missing.
if (sections[0] == ""):
# Delete the first empty string.
del sections[0]
else:
# Add an empty name and not-omitted to the first section.
sections.insert(0, "") # Name
sections.insert(0, "") # Not omitted
# List should contain an even number of elements.
if len(sections) % 3 != 0:
print("Bug: Section splitting failed.")
fail()
# Convert the list to dictionaries of code and omitted.
ret_code = {}
ret_omitted = {}
for i in range(0, len(sections), 3):
omitted = sections[i] == "Omitted"
name = sections[i + 1]
code = sections[i + 2]
ret_code[name] = code
ret_omitted[name] = omitted
# Requests cannot have Omitted sections. Omitted sections cannot contain code.
if omitted:
if response:
if code != "":
print("Warning: Verilog of response has an omitted section with code.")
else:
print("Warning: Verilog of request has an omitted section.")
return [ret_code, ret_omitted]
"""
# Convert the given LLM API response string from the pseudo-Markdown format requested into an object, as described
# in default_system_message.txt.
# response: The response string from the LLM API.
# verilog: The original Verilog code, needed to reconstruct sections that are omitted in the response.
def response_to_obj(self, response, verilog):
# Parse the response, line by line, looking for second-level Markdown header lines.
lines = response.split("\n")
l = 0
fields = {}
field = None
while l < len(lines):
# Parse body lines until the next field header or end of message.
body = ""
separator = ""
while l < len(lines) and not lines[l].startswith("## "):
if (body != "") or (re.match(r"^\s*$", lines[l]) is None): # Ignore leading blank lines.
body += separator + lines[l]
separator = "\n"
l += 1
# Found header line or EOM.
# Process the body field that ended.
# Strip trailing whitespace.
body = re.sub(r"\s*$", "", body)
if field is None:
if body != "":
print("Error: The following body text was found before the first header and will be ignored:")
print(body)
else:
# "verilog" field should not be in block quotes, but it's hard to convince the LLM, so strip them if present.
if field == "verilog":
body, n = re.subn(r"^```(verilog)?\n(.*)\n+```\n?$", r"\2\n", body, flags=re.DOTALL)
if n != 0:
print("Warning: The \"verilog\" field of the response was contained in block quotes. They were stripped.")
# Make sure the Verilog code ends with a newline (because we pattern match lines ending in newline).
if body != "" and body[-1] != "\n":
body += "\n"
"""
# Split the request and response Verilog into sections.
[response_sections, response_omitted] = self.split_sections(body, True)
[orig_sections, orig_omitted] = self.split_sections(verilog, False)
# Reconstruct the full response Verilog, adding omitted sections from the original Verilog.
body = ""
for name, code in response_sections.items():
if name:
body += "// LLM: Section: " + name + "\n"
omitted = response_omitted[name]
# Add the section from the original Verilog if it was omitted.
if omitted:
body += orig_sections[name]
else:
body += code
"""
with open("tmp/llm_resp.v", "w") as f:
f.write(body)
if body == "...\n":
# Won't use intermediate files. Delete them to avoid confusion.
os.remove("tmp/diff.txt", "tmp/diff_mod.txt", "tmp/llm_upd.v")
body = verilog
else:
if ChangeMerger.merge_changes("tmp/pre_llm.v", "tmp/llm_resp.v", "tmp/diff.txt", "tmp/diff_mod.txt", "tmp/llm_upd.v"):
with open("tmp/llm_upd.v") as f:
body = f.read()
else:
body = False
# Capture the previous field.
# Boolean responses.
if body == "true" or body == "false":
body = body == "true"
# Capture the field body.
fields[field] = body
if l < len(lines):
# Parse the header line with a regular expression.
field = re.match(r"## +(\w+)", lines[l]).group(1)
# The field name should be a lower-case words with underscore delimitation.
if not re.match(r"[a-z_]*", field):
print("Warning: The following malformed field name was found in the response:")
print(field)
# Convert field name to lower case.
field = field.lower()
# Check for legal field name.
if field not in response_fields | set(prompts[prompt_id].get("must_produce", [])) | set(prompts[prompt_id].get("may_produce", [])):
print("Warning: The following non-standard field was found in the response:")
print(field)
# Done with this header line.
l += 1
return fields
# Add Verilog to last message to be sent to the API.
# messages: The messages.json object in OpenAI format.
# verilog: The current Verilog file contents.
def add_verilog(self, messages, verilog):
# Add verilog to the last message.
messages[-1]["content"] += "\n\n## verilog\n\n" + verilog
# A class for incorporating changes from the LLM into the Verilog file.
#
# Request to the LLM include Verilog file contents.
# Responses from the LLM include updated Verilog. This Verilog need not be provided in its entirety. "..." lines
# can be used by the LLM to represent unchanged portions of the file.
# We rely of diff and patch to reconstruct the full updated Verilog file as follows:
# 1) We use diff to identify the changes, including the replacement of sections of code with "..." lines.
# 2) We modify the diff file to remove the "..." substitutions.
# 3) We apply the modified diff file to the original Verilog file to produce the updated Verilog file.
class ChangeMerger:
hunk_header_re = re.compile(r'^@@ -(\d+),(\d+) \+(\d+),(\d+) @@')
# Helper for adjust_diff to write a hunk to the output file after adjusting its header.
def write_hunk(hunk, outfile, orig_offset, mod_offset):
if not any(line.startswith(('+', '-')) for line in hunk[1:]): # Check if hunk has changes
return # Skip empty hunks
# Adjust the hunk header
match = ChangeMerger.hunk_header_re.match(hunk[0])
if match:
orig_start, orig_len, mod_start, mod_len = map(int, match.groups())
# Adjust starting line numbers and lengths
orig_start -= orig_offset
mod_start -= mod_offset
orig_len = sum(1 for line in hunk[1:] if line.startswith(('-', ' ')))
mod_len = sum(1 for line in hunk[1:] if line.startswith(('+', ' ')))
hunk[0] = f"@@ -{orig_start},{orig_len} +{mod_start},{mod_len} @@\n"
outfile.writelines(hunk)
# Create a modified diff file, removing "..." changes.
# Line numbers in the diff file must be adjusted to reflect the changes.
# Parameters:
# input_diff: The original diff file path.
# output_diff: The modified diff file path.
def adjust_diff(input_diff, output_diff):
current_hunk = [] # To accumulate lines for the current hunk
skip_hunk = False # To track whether to skip the current hunk
orig_line_offset = 0 # Tracks offset adjustments for the original file
mod_line_offset = 0 # Tracks offset adjustments for the modified file
with open(input_diff, 'r') as infile, open(output_diff, 'w') as outfile:
for line in infile:
if ChangeMerger.hunk_header_re.match(line):
# Handle the previous hunk
if not skip_hunk:
ChangeMerger.write_hunk(current_hunk, outfile, orig_line_offset, mod_line_offset)
current_hunk = [line] # Start a new hunk
skip_hunk = False # Reset skip flag
elif line.startswith('+...'):
skip_hunk = True # Mark this hunk for skipping
elif not skip_hunk:
current_hunk.append(line) # Accumulate lines for the current hunk
elif line.startswith('-') and skip_hunk:
# Update offsets when skipping removed lines
orig_line_offset += 1
elif line.startswith('+') and skip_hunk:
# Update offsets when skipping added lines
mod_line_offset += 1
# Handle the final hunk
if not skip_hunk:
ChangeMerger.write_hunk(current_hunk, outfile, orig_line_offset, mod_line_offset)
# Merge changes.
# Parameters:
# orig_file: The original file path.
# modified_file: The modified file path (including "..." lines).
# diff_file: The path for the diff file.
# modified_diff_file: The path for the modified diff file.
# output_file: The output file path.
# Returns:
# Success
def merge_changes(orig_file, modified_file, diff_file, modified_diff_file, output_file):
# Create a diff file between the original and modified files.
status = os.system(f"diff -u {orig_file} {modified_file} > {diff_file}")
# Adjust the diff file to remove "..." substitutions.
ChangeMerger.adjust_diff(diff_file, modified_diff_file)
# Apply the modified diff file to the original file to produce the output file.
rslt = run_command(['patch', '-o', output_file, orig_file, modified_diff_file])
return rslt.returncode == 0
###############
# #
# Functions #
# #
###############
###########
# Generic #
###########
# Run a system command, reporting the error if it fails and produces stderr output.
# Return the same structure as subprocess.run.
def run_command(cmd):
rslt = subprocess.run(cmd, capture_output=True, text=True)
if (rslt.returncode != 0) and rslt.stderr:
print("Error: Command failed.")
print(" '" + " ".join(cmd) + "' failed as follows:")
print(rslt.stderr)
return rslt
##################
# Usage and Exit #
##################
# Report a usage message.
def usage():
print("Usage: python3 .../convert.py")
print(" Call from a directory containing a single Verilog file to convert or a \"history\" directory.")
fail()
def fail():
sys.exit(1)
## Capture the current terminal settings before setting raw mode
#default_settings = termios.tcgetattr(sys.stdin)
##old_settings = termios.tcgetattr(sys.stdin)
def cleanup():
print("Exiting cleanly.")
# Set the terminal settings to the default settings
#termios.tcsetattr(sys.stdin, termios.TCSADRAIN, default_settings)
#set_cooked_mode(sys.stdin.fileno())
# Register the cleanup function
atexit.register(cleanup)
# Catch signals for proper cleanup.
# Define a handler for signals that will perform cleanup
def signal_handler(signum, frame):
print(f"Caught signal {signum}, exiting...")
sys.exit(1)
# Register the signal handler for as many signals as possible.
for sig in [signal.SIGABRT, signal.SIGINT, signal.SIGTERM]:
signal.signal(sig, signal_handler)
##################
# Terminal input #
##################
def set_raw_mode(fd):
attrs = termios.tcgetattr(fd) # get current attributes
attrs[3] = attrs[3] & ~termios.ICANON # clear ICANON flag
termios.tcsetattr(fd, termios.TCSANOW, attrs) # set new attributes
def set_cooked_mode(fd):
attrs = termios.tcgetattr(fd) # get current attributes
attrs[3] = attrs[3] | termios.ICANON # set ICANON flag
termios.tcsetattr(fd, termios.TCSANOW, attrs) # set new attributes
# Set to default cooked mode (in case the last run was exited in raw mode).
set_cooked_mode(sys.stdin.fileno())
def getch():
## Save the current terminal settings
#old_settings = termios.tcgetattr(sys.stdin)
try:
# Set the terminal to raw mode
set_raw_mode(sys.stdin.fileno())
# Wait for input to be available
[i], _, _ = select([sys.stdin.fileno()], [], [], None)
# Read a single character
ch = sys.stdin.read(1)
finally:
# Restore the terminal settings
set_cooked_mode(sys.stdin.fileno())
return ch
# Prompt the user for a single char input (single keypress).
def prompt_user(prompt, options=None, default=None):
p = prompt
if options:
p += " [" + "/".join(options) + "]"
if default:
p += " (default: " + default + ")"
print(p)
while True:
again = False
print("> ", end="")
ch = getch()
print("")
# if ch isn't among the options, use default if there is one.
if options and ch not in options:
if default:
ch = default
else:
print("Error: Invalid input. Try again.")
again = True
if not again:
return ch
# Accept terminal input command character from among the given list.
def get_command(options):
while True:
print("")
ch = prompt_user("Press one of the following command keys: " + ", ".join(options))
if ch not in options:
print("Error: Invalid key. Try again.")
else:
return ch
# Pause for a key press.
def press_any_key(note=""):
print("Press any key to continue...%s\n>" % note, end="")
getch()
###################
# File Operations #
###################
# Determine if a filename has a Verilog/SystemVerilog extension.
def is_verilog(filename):
return filename.endswith(".v") or filename.endswith(".sv")
def diff(file1, file2):
return os.system("diff -q '" + file1 + "' '" + file2 + "' > /dev/null") != 0
def copy_if_different(src, dest):
if diff(src, dest):
shutil.copyfile(src, dest)
# Read the prompt ID from the given file.
def read_prompt_id(file):
prompt_id = json.loads(f.read())
# Two formats are supported. This is either a number or {"id": number, "desc": "description"}.
if type(prompt_id) == dict:
# Verify that the description matches the prompt.
desc = prompt_id["desc"]
id = prompt_id["id"]
if prompts[id]["desc"] != desc:
# Prompts have changed. See if we can identify the proper ID by looking up the description in prompts_by_desc.
prompt_id_str = "{" + str(prompt_id["id"]) + ", " + prompt_id["desc"] + "}"
if desc in prompts_by_desc:
id = prompts_by_desc[desc]["index"]
# Report the correction.
print("Warning: Prompts have been changed. Corrected ID " + prompt_id_str + " to " + str(id) + " for current prompt: \"" + desc + "\".")
else:
print("\nError: The description in \"prompt_id.txt\" does not match any prompt description in prompts.json.")
print(" The description in \"prompt_id.txt\" is: \"" + desc + "\".")
print(" The descriptions for ID " + prompt_id_str + " in prompts.json is:" + prompts[prompt_id]["desc"] + "\".")
# Continue?
ch = prompt_user("Continue with the current prompt ID?", {"y", "n"}, "n")
if ch == "n":
fail()
# Correct the prompt ID.
prompt_id = id
return prompt_id
###################################
# Working with Status and History #
###################################
def changes_pending():
return os.path.exists(mod_path() + "/" + working_verilog_file_name) and diff(working_verilog_file_name, mod_path() + "/" + working_verilog_file_name)
# See if there were any manual edits to the Verilog file and capture them in the history if so.
def checkpoint_if_pending():
# if latest mod file exists and is different from working file, checkpoint working file.
if changes_pending():
print("Manual edits were made and are being checkpointed.")
checkpoint({ "by": "human" })
# Functions that determine the state of the refactoring step based on the state of the files.
# TODO: replace?
#def llm_passed():
# return os.path.exists(llm_verilog_file_name)
def llm_finished():
return not readStatus().get("incomplete", True)
def fev_passed():
return os.path.exists("fev/PASS") and os.system("diff " + module_name + ".v fev/src/" + module_name + ".v") == 0
def update_chkpt():
os.system("ln -sf " + mod_path() + "/" + working_verilog_file_name + " chkpt.v")
def update_feved():
os.system("ln -sf " + most_recently_feved_verilog_file() + " feved.v")
def readStatus(mod = None):
# Default mod to mod_num
if mod is None:
mod = mod_num
# Read status from latest history change directory.
try:
with open(mod_path(mod) + "/status.json") as file:
return json.load(file)
except:
return {}
def writeStatus(status):
# Write status to latest history change directory.
with open(mod_path() + "/status.json", "w") as file:
json.dump(status, file)
# Evaluate the given anonymous function, fn(mod), from the most recent modification to the least recent until fn indicates completion.
# fn(mod) returns False to keep iterating or True to terminate.
# Return the terminating mod number or None.
def most_recent(fn, mod=None):
# Default mod to mod_num
if mod is None:
mod = mod_num
while mod >= 0:
mod = actual_mod(mod)
if fn(mod):
return mod
mod -= 1
return None
def most_recently_feved_verilog_file():
last_fev_mod = most_recent(lambda mn: (readStatus(mn).get("fev") == "passed"))
assert(last_fev_mod is not None)
return mod_path(last_fev_mod) + "/" + working_verilog_file_name
# Number of the most recent modification (that actually made a change) or None.
def most_recent_mod():
return most_recent(lambda mod: (readStatus(mod).get("modified", False)))
# The path of the latest modification of this refactoring step.
def mod_path(mod = None):
# Default mod to mod_num
if mod is None:
mod = mod_num
return "history/" + str(refactoring_step) + "/mod_" + str(mod)
# Set mod_num to the maximum for the current refactoring step.
def set_mod_num():
global mod_num
mod_num = -1
while os.path.exists(mod_path(mod_num + 1)):
mod_num += 1
# Get the actual modification of the given modification number (or current). In other words, if the given mod is a
# reversion, follow the symlink.
def actual_mod(mod=None):
if mod is None:
mod = mod_num
if os.path.islink(mod_path(mod)):
tmp1 = os.readlink(mod_path(mod))[4:]
tmp2 = int(tmp1)
return tmp2
else:
return mod
# Capture Verilog file in a new history/#/mod_#/, and if this was an LLM modification, capture messages.json and llm_response.txt.
# status: The status to save with the checkpoint, updated as new status.
# old_status: For use only for the first checkpoint of a refactoring step. This is the status from the prior refactoring step.
# verilog_file: The verilog file to capture (defaulted to working_verilog_file_name and checkpointed as working_verilog_file_name regardless).
# Sticky status is applied from current status. Status["incomplete"] will be carried over from the prior checkpoint for non-LLM updates.
def checkpoint(status, old_status = None, verilog_file = None):
if verilog_file is None:
verilog_file = working_verilog_file_name
global mod_num
# Carry over status from the prior checkpoint that is sticky (not in status_fields).
# Also, carry over "plan" within the refactoring step excluding "llm" checkpoints.
if mod_num >= 0:
old_status = readStatus()
for field in old_status:
# Some fields are provided by the LLM and are sticky only within the refactoring step.
if field in llm_status_fields:
if mod_num >= 0 and status.get("by") != "llm":
status[field] = old_status[field]
elif field not in status and field not in status_fields:
status[field] = old_status[field]
# Capture the current Verilog file in new mod dir and update chkpt.v.
mod_num += 1
mod_dir = mod_path()
os.mkdir(mod_dir)
os.system("cp " + verilog_file + " " + mod_dir + "/" + working_verilog_file_name)
# Capture messages.json and llm_response.txt if this was an LLM modification.
if status.get("by") == "llm":
os.system("cp messages.json llm_response.txt " + mod_dir)
# Write status.json.
writeStatus(status)
# Update chkpt.v and feved.v links to reflect this new checkpoint.
update_chkpt()
update_feved()
# Make Verilog file read-only (to prevent inadvertent modification, esp. in meld).
# ("status.json" may still be updated with FEV status.)
os.system("chmod a-w " + mod_dir + "/" + working_verilog_file_name)
# Create a reversion checkpoint as a symlink, or if the previous change was a reversion, update its symlink.
def checkpoint_reversion(prev_mod):
global mod_num
# Prepare to create a new reversion checkpoint.
if os.path.islink(mod_path()):
# Remove old reversion symlink.
os.remove(mod_path())
else:
# Must create a new reversion checkpoint.
mod_num += 1
# Create reversion symlink.
os.symlink("mod_" + str(prev_mod), mod_path())
# Update chkpt.v and feved.v links to reflect this reversion.
update_chkpt()
update_feved()
######################
# Formatting/Parsing #
######################
# Process JSON with newlines in strings into proper JSON.
def from_extended_json(ejson):
# Iterate over the characters of raw_contents, keeping track of whether we are within a string, and replacing newlines with '\n'.
# For backward-compatibility with an old syntax, we replace "\n+" as well as "\n" with '\n'.
json_str = ""
in_string = False
after_newline = False
for c in ejson:
if after_newline:
if c == '+':
after_newline = False
continue
after_newline = False
if c == '"':
in_string = not in_string
if c == '\n' and in_string:
c = '\\n'
after_newline = True
json_str += c
return json_str
# Convert a JSON string into a more readable version with newlines in strings.
def to_extended_json(json_str):
# Iterate over the characters of the JSON string, keeping track of whether we are within a string, and replacing '\n' with newlines.
ejson = ""
in_string = False
prev_c = None
for c in json_str:
if c == '"':
in_string = not in_string
if prev_c == '\\' and c == 'n' and in_string:
prev_c = None
c = '\n'
if prev_c != None:
ejson += prev_c
prev_c = c
if prev_c != None:
ejson += prev_c
return ejson
##################
# Initialization #
##################
def initialize_messages_json():
# Initialize messages.json.
# TODO: This is specific to the API and should be done only when the API is called? Hmmm... it is done here to enable human edits before the API call.
try:
# Read the system message from <repo>/default_system_message.txt.
with open(repo_dir + "/default_system_message.txt") as file:
system = file.read()
# Initialize messages.json.
with open("messages.json", "w") as message_file:
prompt = prompts[prompt_id]["prompt"]
# Search prompt string for "m5_" and use M5 if found.
if prompt.find("m5_") != -1:
prompt = processPromptWithM5(prompt, readStatus())
# Add "needs" fields to the prompt.
if "needs" in prompts[prompt_id]:
prompt += "\n\n" + "Note that the following attributes have been determined about the Verilog code:"
status = readStatus()
for field in prompts[prompt_id]["needs"]:
prompt += "\n " + field + ": " + status.get(field, "")
message_obj = {}
# If prompt has a "background" field, add it (first) to the message.
if "background" in prompts[prompt_id]:
message_obj["background"] = prompts[prompt_id]["background"]
message_obj["prompt"] = prompt
message = message_bundler.obj_to_request(message_obj)
ejson_messages = to_extended_json(json.dumps(llm_api.initPrompt(system, message), indent=4))
message_file.write(ejson_messages)
except Exception as e:
print("Error: Failed to initialize messages.json due to: " + str(e))
fail()
# Initialize the conversion directory for the next refactoring step.
def init_refactoring_step():
global refactoring_step, mod_num, prompt_id
# Get sticky status from current refactoring step before creating next.
old_status = {}
if refactoring_step <= 0:
# Test that the code can be parsed by FEV.
if not run_fev(working_verilog_file_name, working_verilog_file_name, True):
print("Error: The original Verilog code failed to run through FEV flow.")
print("Debug using logs in \"fev\" directory.")
fail()
else:
old_status = readStatus()
refactoring_step += 1
mod_num = -1
# Find the next prompt that should be executed.
ok = False
while not ok:
prompt_id += 1
# Check if conditions.
if_ok = True # Prompt is okay to execute based on "if" conditions.
if "if" in prompts[prompt_id]:
if_ok = False
for field in prompts[prompt_id]["if"]:
# If the field is a string, make it an array of one string.