-
Notifications
You must be signed in to change notification settings - Fork 25
/
imagescale-c.py
executable file
·138 lines (115 loc) · 4.67 KB
/
imagescale-c.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#!/usr/bin/env python3
# Copyright © 2012-13 Qtrac Ltd. All rights reserved.
# This program or module is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version. It is provided for
# educational purposes and is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
import argparse
import collections
import math
import multiprocessing
import os
import sys
import Image
import Qtrac
Result = collections.namedtuple("Result", "todo copied scaled name")
def main():
size, smooth, source, target, concurrency = handle_commandline()
Qtrac.report("starting...")
canceled = False
try:
scale(size, smooth, source, target, concurrency)
except KeyboardInterrupt:
Qtrac.report("canceling...")
canceled = True
summarize(concurrency, canceled)
def handle_commandline():
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--concurrency", type=int,
default=multiprocessing.cpu_count(),
help="specify the concurrency (for debugging and "
"timing) [default: %(default)d]")
parser.add_argument("-s", "--size", default=400, type=int,
help="make a scaled image that fits the given dimension "
"[default: %(default)d]")
parser.add_argument("-S", "--smooth", action="store_true",
help="use smooth scaling (slow but good for text)")
parser.add_argument("source",
help="the directory containing the original .xpm images")
parser.add_argument("target",
help="the directory for the scaled .xpm images")
args = parser.parse_args()
source = os.path.abspath(args.source)
target = os.path.abspath(args.target)
if source == target:
args.error("source and target must be different")
if not os.path.exists(args.target):
os.makedirs(target)
return args.size, args.smooth, source, target, args.concurrency
def scale(size, smooth, source, target, concurrency):
pipeline = create_pipeline(size, smooth, concurrency)
for i, (sourceImage, targetImage) in enumerate(
get_jobs(source, target)):
pipeline.send((sourceImage, targetImage, i % concurrency))
def create_pipeline(size, smooth, concurrency):
pipeline = None
sink = results()
for who in range(concurrency):
pipeline = scaler(pipeline, sink, size, smooth, who)
return pipeline
def get_jobs(source, target):
for name in os.listdir(source):
yield os.path.join(source, name), os.path.join(target, name)
@Qtrac.coroutine
def scaler(receiver, sink, size, smooth, me):
while True:
sourceImage, targetImage, who = (yield)
if who == me:
try:
result = scale_one(size, smooth, sourceImage, targetImage)
sink.send(result)
except Image.Error as err:
Qtrac.report(str(err), True)
elif receiver is not None:
receiver.send((sourceImage, targetImage, who))
@Qtrac.coroutine
def results():
while True:
result = (yield)
results.todo += result.todo
results.copied += result.copied
results.scaled += result.scaled
Qtrac.report("{} {}".format("copied" if result.copied else "scaled",
os.path.basename(result.name)))
results.todo = results.copied = results.scaled = 0
def scale_one(size, smooth, sourceImage, targetImage):
oldImage = Image.from_file(sourceImage)
if oldImage.width <= size and oldImage.height <= size:
oldImage.save(targetImage)
return Result(1, 1, 0, targetImage)
else:
if smooth:
scale = min(size / oldImage.width, size / oldImage.height)
newImage = oldImage.scale(scale)
else:
stride = int(math.ceil(max(oldImage.width / size,
oldImage.height / size)))
newImage = oldImage.subsample(stride)
newImage.save(targetImage)
return Result(1, 0, 1, targetImage)
def summarize(concurrency, canceled):
message = "copied {} scaled {} ".format(results.copied, results.scaled)
difference = results.todo - (results.copied + results.scaled)
if difference:
message += "skipped {} ".format(difference)
message += "using {} coroutines".format(concurrency)
if canceled:
message += " [canceled]"
Qtrac.report(message)
print()
if __name__ == "__main__":
main()