-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
156 lines (125 loc) · 3.84 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
from fastapi import FastAPI, Request
import uvicorn
import argparse
from raft.node import Node
import threading
from time import sleep, perf_counter
import os
parser = argparse.ArgumentParser()
parser.add_argument('-p', '--port', required=True)
parser.add_argument('-nl', '--nodes', default="./nodes.txt")
parser.add_argument("-f", "--file", required=True)
parser.add_argument("-c", "--config", required=True)
args = parser.parse_args()
log_file = args.file
read_logs = []
if os.path.isfile(log_file):
f = open(log_file, "r+")
for line in f.readlines():
line = line.strip()
read_logs.append(line)
f.close()
else:
f = open(log_file, "w+")
f.close()
if not os.path.isfile(args.config):
f_config = open(args.config, "w+")
f_config.close()
nl = open(args.nodes, "r+")
node_list = []
s_id = -1
for i, line in enumerate(nl.readlines()):
line = line.strip()
if(int(line) == int(args.port)):
s_id = i
node_list.append((i, int(line)))
nl.close()
#sleep(int(args.start))
app = FastAPI()
node = Node(node_list, s_id, read_logs, log_file, args.config)
def StartApplication(app, port):
uvicorn.run(app, host="localhost", port=port)
@app.get("/hello")
def hello():
return {
"hello":"TAs"
}
#These API are used to message between nodes.
@app.post("/appendEntries")
async def appendEntries(message: Request):
message = await message.json()
node.election_start = perf_counter()
res = node.AppendEntriesRes(message)
return res
@app.post("/voteRequest")
async def voteRequest(record: Request):
parsed = await record.json()
res = node.VoteResponse(parsed)
return res
#These API are endpoints used by client
@app.post("/RegisterBrokerRecord")
async def registerBroker(record: Request):
parsed = await record.json()
print(parsed)
res = node.AppendLogEntries(parsed)
return res
@app.get("/RegisterBrokerRecord/{id}")
async def getBrokerByID(id: int):
print(id)
res = node.getBroker(id)
return res
@app.get("/RegisterBrokerRecord")
async def getBrokers():
res = node.getAllBrokers()
return res
@app.post("/topicRecord")
async def AddTopicRecord(record:Request):
parsed = await record.json()
res = node.AppendLogEntries(parsed)
return res
@app.get("/topicRecord/{TopicID}")
async def GetTopicRecord(TopicID):
res = node.GetTopicRecord(TopicID)
return res
@app.post("/producerIdsRecord")
async def ProducerIdsRecord(record:Request):
parsed = await record.json()
res = node.AppendLogEntries(parsed)
return res
@app.put("/BrokerRegistrationChangeBrokerRecord")
async def UpdateBrokerRecord(record:Request):
parsed = await record.json()
res = node.AppendLogEntries(parsed)
return res
@app.delete("/BrokerRegistrationChangeBrokerRecord")
async def UnregisterBroker(record:Request):
parsed = await record.json()
res = node.AppendLogEntries(parsed)
return res
@app.post("/partitionRecord")
async def CreatePartition(record:Request):
parsed = await record.json()
res = node.AppendLogEntries(parsed)
return res
@app.post("/partitionRecord/remove")
async def RemoveReplica(record:Request):
parsed = await record.json()
res = node.AppendLogEntries(parsed)
return res
@app.post("/partitionRecord/add")
async def AddReplica(record:Request):
parsed = await record.json()
res = node.AppendLogEntries(parsed)
return res
@app.post("/BrokerUpdates")
async def BrokerManagement(record:Request):
record = await record.json()
return node.sendBrokerUpdates(record)
@app.post("/ClientUpdates")
async def ClientManagement(record:Request):
record = await record.json()
return node.sendClientUpdates(record)
app_thread = threading.Thread(target=StartApplication, args=(app, int(args.port)))
app_thread.start()
node.StartElectionTimer()
app_thread.join()