forked from ZhaoyangLiu-Leo/mini-DFS
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mini_dfs.py
419 lines (358 loc) · 14.3 KB
/
mini_dfs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
# -*- coding: utf-8 -*-
from const_share import *
from random import choice
import os
import math
import pickle
from enum import Enum
import threading
import sys
operation_names = ('put', 'read', 'fetch', 'quit', 'ls')
OPERATION = Enum('OPERATION', operation_names)
# 全局变量,在Name Node和Data Node之间共享
global_server_block_map = {}
global_read_block = None
global_read_offset = None
global_read_count = None
global_cmd_flag = False
global_file_id = None
global_file_path = None
global_cmd_type = None
global_fetch_savepath = None
global_fetch_servers = []
global_fetch_blocks = None
# ----------------定义Event,控制线程通信----------------
name_event = threading.Event()
ls_event = threading.Event()
read_event = threading.Event()
data_events = []
main_events = []
for j in range(NUM_DATA_SERVER):
data_events.append(threading.Event())
main_events.append(threading.Event())
def add_block_2_server(server_id, block, offset, count):
global global_server_block_map
if server_id not in global_server_block_map:
global_server_block_map[server_id] = []
global_server_block_map[server_id].append((block, offset, count))
def process_cmd(cmd):
"""
解析command,记录command是否有效和相应的命令
"""
global global_file_path, global_file_id, global_cmd_type, global_fetch_savepath
global global_read_offset, global_read_count
cmds = cmd.split()
flag = False
if len(cmds) >= 1 and cmds[0] in operation_names:
if cmds[0] == operation_names[0]:
if len(cmds) != 2:
print('Usage: put source_file_path')
else:
if not os.path.isfile(cmds[1]):
print ('Error: input file does not exist')
else:
global_file_path = cmds[1]
global_cmd_type = OPERATION.put
flag = True
elif cmds[0] == operation_names[1]:
if len(cmds) != 4:
print('Usage: read file_id offset count')
else:
try:
global_file_id = int(cmds[1])
global_read_offset = int(cmds[2])
global_read_count = int(cmds[3])
except ValueError:
print ('Error: fileid, offset, count should be integer')
else:
global_cmd_type = OPERATION.read
flag = True
elif cmds[0] == operation_names[2]:
if len(cmds) != 3:
print ('Usage: fetch file_id save_path')
else:
global_fetch_savepath = cmds[2]
if not os.path.exists(os.path.split(global_fetch_savepath)[0]):
print ('Error: input save_path does not exist')
else:
try:
global_file_id = int(cmds[1])
except ValueError:
print ('Error: fileid should be integer')
else:
global_cmd_type = OPERATION.fetch
flag = True
elif cmds[0] == operation_names[3]:
if len(cmds) != 1:
print ('Usage: quit')
else:
start_stop_info('Stop')
print ("Bye: Exiting miniDFS...")
os._exit(0)
flag = True
global_cmd_type = OPERATION.quit
elif cmds[0] == operation_names[4]:
if len(cmds) != 1:
print ('Usage: ls')
else:
flag = True
global_cmd_type = OPERATION.ls
else:
pass
else:
print ('Usage: put|read|fetch|quit|ls')
return flag
class NameNode(threading.Thread):
"""
Name Server,负责任务的分发,实现了ls、read、fetch功能
"""
def __init__(self, name):
super(NameNode, self).__init__(name=name)
self.metas = None
self.id_block_map = None
self.id_file_map = None
self.block_server_map = None
self.last_file_id = -1
self.last_data_server_id = -1
self.load_meta()
def run(self):
global global_cmd_flag, global_cmd_type
while True:
# while true,一直等待可执行的命令
name_event.wait()
if global_cmd_flag:
if global_cmd_type == OPERATION.put:
self.generate_split()
elif global_cmd_type == OPERATION.read:
self.assign_read_work()
elif global_cmd_type == OPERATION.fetch:
self.assign_fetch_work()
elif global_cmd_type == OPERATION.ls:
self.list_dfs_files()
else:
pass
name_event.clear()
# print("namenode completed.")
def load_meta(self):
"""
加载Name Node Meta Data
"""
if not os.path.isfile(NAME_NODE_META_PATH):
self.metas = {
'id_block_map': {},
'id_len_map': {},
'block_server_map': {},
'last_file_id': -1,
'last_data_server_id': -1
}
else:
with open(NAME_NODE_META_PATH, 'rb') as f:
self.metas = pickle.load(f)
self.id_block_map = self.metas['id_block_map']
self.id_file_map = self.metas['id_len_map']
self.block_server_map = self.metas['block_server_map']
self.last_file_id = self.metas['last_file_id']
self.last_data_server_id = self.metas['last_data_server_id']
def update_meta(self):
"""
更新Name Node Meta Data,每次put操作后更新
"""
with open(NAME_NODE_META_PATH, 'wb') as f:
self.metas['last_file_id'] = self.last_file_id
self.metas['last_data_server_id'] = self.last_data_server_id
pickle.dump(self.metas, f)
def list_dfs_files(self):
"""
ls命令,打印meta data信息
"""
print ('total', len(self.id_file_map))
for file_id, (file_name, file_len) in self.id_file_map.items():
print (LS_PATTERN % (file_id, file_name, file_len))
ls_event.set()
def generate_split(self):
"""
将输入文件划分block,分发到不同的blocks中
:return:
"""
global global_server_block_map, global_file_path, global_file_id
in_path = global_file_path
file_name = in_path.split('/')[-1]
self.last_file_id += 1
server_id = (self.last_data_server_id + 1) % NUM_REPLICATION
file_length = os.path.getsize(in_path)
blocks = int(math.ceil(float(file_length) / BLOCK_SIZE))
# 生成block名字,添加到<id, blocks>映射表中
self.id_block_map[self.last_file_id] = [BLOCK_PATTERN % (self.last_file_id, i) for i in range(blocks)]
self.id_file_map[self.last_file_id] = (file_name, file_length)
for i, block in enumerate(self.id_block_map[self.last_file_id]):
self.block_server_map[block] = []
# 备份chunk 3次,分配到不同的DataNode上
for j in range(NUM_REPLICATION):
assign_server = (server_id + j) % NUM_DATA_SERVER
self.block_server_map[block].append(assign_server)
# 将block和server的分配信息同时添加到全局变量中
size_in_block = BLOCK_SIZE if i < blocks - 1 else (
file_length - BLOCK_SIZE * (blocks - 1))
add_block_2_server(assign_server, block, BLOCK_SIZE * i, size_in_block)
server_id = (server_id + NUM_REPLICATION) % NUM_DATA_SERVER
self.last_data_server_id = (server_id - 1) % NUM_DATA_SERVER
self.update_meta()
global_file_id = self.last_file_id
for data_event in data_events:
data_event.set()
def assign_read_work(self):
"""
分配读取任务到具体的Data Node上
:return:
"""
global global_file_id, global_read_block, global_read_offset, global_read_count
file_id = global_file_id
read_offset = global_read_offset
read_count = global_read_count
if file_id not in self.id_file_map:
print ('No such file with id =', file_id)
read_event.set()
elif (read_offset + read_count) > self.id_file_map[file_id][1]:
print ('The expected reading exceeds the file, file size:', self.id_file_map[file_id][1])
read_event.set()
else:
start_block = int(math.floor(read_offset / BLOCK_SIZE))
space_left_in_block = (start_block + 1) * BLOCK_SIZE - read_offset
if space_left_in_block < read_count:
print ('Cannot read across blocks')
read_event.set()
else:
# 从存储数据的block中随机选择一个data server,进行数据读取
read_server_candidates = self.block_server_map[BLOCK_PATTERN % (file_id, start_block)]
read_server_id = choice(read_server_candidates)
global_read_block = BLOCK_PATTERN % (file_id, start_block)
global_read_offset = read_offset - start_block * BLOCK_SIZE
data_events[read_server_id].set()
return True
return False
def assign_fetch_work(self):
"""
分配下载任务
"""
global global_file_id, global_fetch_blocks, global_fetch_servers
file_id = global_file_id
if file_id not in self.id_file_map:
print ('No such file with id =', file_id)
else:
file_blocks = self.id_block_map[file_id]
global_fetch_blocks = len(file_blocks)
# 获取存储文件的对应server
for block in file_blocks:
global_fetch_servers.append(self.block_server_map[block][0])
for data_event in data_events:
data_event.set()
return True
for data_event in data_events:
data_event.set()
return None
class DataNode(threading.Thread):
"""
Data Server,处理具体任务,put命令block的写入,read任务
"""
def __init__(self, server_id):
super(DataNode, self).__init__(name='DataServer%s' % (server_id,))
self._server_id = server_id
def run(self):
global global_cmd_flag, global_cmd_type, global_server_block_map
while True:
data_events[self._server_id].wait()
if global_cmd_flag:
if global_cmd_type == OPERATION.put and self._server_id in global_server_block_map:
self.save_file()
elif global_cmd_type == OPERATION.read:
self.read_file()
else:
pass
data_events[self._server_id].clear()
main_events[self._server_id].set()
def save_file(self):
"""
Data Node根据block的分配情况,进行文件写入
:return:
"""
global global_server_block_map, global_file_path
data_node_dir = DATA_NODE_DIR % (self._server_id,)
with open(global_file_path, 'r') as f_in:
for block, offset, count in global_server_block_map[self._server_id]:
f_in.seek(offset, 0)
content = f_in.read(count)
with open(data_node_dir + os.path.sep + block, 'w') as f_out:
f_out.write(content)
f_out.flush()
def read_file(self):
"""
根据偏移量和Count,读取block文件
:return:
"""
global global_read_block, global_read_offset, global_read_count
read_path = (DATA_NODE_DIR % (self._server_id,)) + os.path.sep + global_read_block
with open(read_path, 'r') as f_in:
f_in.seek(global_read_offset)
content = f_in.read(global_read_count)
print (content)
read_event.set()
def run():
# 创建dfs目录
if not os.path.isdir("dfs"):
os.makedirs("dfs")
for i in range(4):
os.makedirs("dfs/datanode%d"%i)
os.makedirs("dfs/namenode")
# 启动name server和data server
name_server = NameNode('NameServer')
name_server.start()
data_servers = [DataNode(s_id) for s_id in range(NUM_DATA_SERVER)]
for server in data_servers:
server.start()
global global_cmd_type, global_cmd_flag, global_file_id
cmd_prompt = 'MiniDFS > '
print (cmd_prompt,)
while True:
cmd_str = input()
global_cmd_flag = process_cmd(cmd_str)
if global_cmd_flag:
if global_cmd_type == OPERATION.quit:
sys.exit(0)
name_event.set()
if global_cmd_type == OPERATION.put:
for i in range(NUM_DATA_SERVER):
main_events[i].wait()
print ('Put succeed! File ID is %d' % (global_file_id,))
global_server_block_map.clear()
for i in range(NUM_DATA_SERVER):
main_events[i].clear()
elif global_cmd_type == OPERATION.read:
read_event.wait()
read_event.clear()
elif global_cmd_type == OPERATION.ls:
ls_event.wait()
ls_event.clear()
elif global_cmd_type == OPERATION.fetch:
for i in range(NUM_DATA_SERVER):
main_events[i].wait()
f_fetch = open(global_fetch_savepath, mode='wb')
for i in range(global_fetch_blocks):
server_id = global_fetch_servers[i]
block_file_path = "dfs/datanode" + str(server_id) + "/" + str(global_file_id) + '-part-' + str(i)
block_file = open(block_file_path, "rb")
f_fetch.write(block_file.read())
block_file.close()
f_fetch.close()
print ('Finished download!')
for i in range(NUM_DATA_SERVER):
main_events[i].clear()
else:
pass
print (cmd_prompt,)
def start_stop_info(operation):
print (operation, 'NameNode')
for i in range(NUM_DATA_SERVER):
print (operation, 'DataNode' + str(i))
if __name__ == '__main__':
start_stop_info('Start')
run()