forked from bslatkin/effectivepython
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathitem_24.py
executable file
·182 lines (133 loc) · 4.17 KB
/
item_24.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
#!/usr/bin/env python3
# Copyright 2014 Brett Slatkin, Pearson Education Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Preamble to mimick book environment
import logging
from pprint import pprint
from sys import stdout as STDOUT
# Example 1
class InputData(object):
def read(self):
raise NotImplementedError
# Example 2
class PathInputData(InputData):
def __init__(self, path):
super().__init__()
self.path = path
def read(self):
return open(self.path).read()
# Example 3
class Worker(object):
def __init__(self, input_data):
self.input_data = input_data
self.result = None
def map(self):
raise NotImplementedError
def reduce(self, other):
raise NotImplementedError
# Example 4
class LineCountWorker(Worker):
def map(self):
data = self.input_data.read()
self.result = data.count('\n')
def reduce(self, other):
self.result += other.result
# Example 5
import os
def generate_inputs(data_dir):
for name in os.listdir(data_dir):
yield PathInputData(os.path.join(data_dir, name))
# Example 6
def create_workers(input_list):
workers = []
for input_data in input_list:
workers.append(LineCountWorker(input_data))
return workers
# Example 7
from threading import Thread
def execute(workers):
threads = [Thread(target=w.map) for w in workers]
for thread in threads: thread.start()
for thread in threads: thread.join()
first, rest = workers[0], workers[1:]
for worker in rest:
first.reduce(worker)
return first.result
# Example 8
def mapreduce(data_dir):
inputs = generate_inputs(data_dir)
workers = create_workers(inputs)
return execute(workers)
# Example 9
from tempfile import TemporaryDirectory
import random
def write_test_files(tmpdir):
for i in range(100):
with open(os.path.join(tmpdir, str(i)), 'w') as f:
f.write('\n' * random.randint(0, 100))
with TemporaryDirectory() as tmpdir:
write_test_files(tmpdir)
result = mapreduce(tmpdir)
print('There are', result, 'lines')
# Example 10
class GenericInputData(object):
def read(self):
raise NotImplementedError
@classmethod
def generate_inputs(cls, config):
raise NotImplementedError
# Example 11
class PathInputData(GenericInputData):
def __init__(self, path):
super().__init__()
self.path = path
def read(self):
return open(self.path).read()
@classmethod
def generate_inputs(cls, config):
data_dir = config['data_dir']
for name in os.listdir(data_dir):
yield cls(os.path.join(data_dir, name))
# Example 12
class GenericWorker(object):
def __init__(self, input_data):
self.input_data = input_data
self.result = None
def map(self):
raise NotImplementedError
def reduce(self, other):
raise NotImplementedError
@classmethod
def create_workers(cls, input_class, config):
workers = []
for input_data in input_class.generate_inputs(config):
workers.append(cls(input_data))
return workers
# Example 13
class LineCountWorker(GenericWorker):
def map(self):
data = self.input_data.read()
self.result = data.count('\n')
def reduce(self, other):
self.result += other.result
# Example 14
def mapreduce(worker_class, input_class, config):
workers = worker_class.create_workers(input_class, config)
return execute(workers)
# Example 15
with TemporaryDirectory() as tmpdir:
write_test_files(tmpdir)
config = {'data_dir': tmpdir}
result = mapreduce(LineCountWorker, PathInputData, config)
print('There are', result, 'lines')