-
Notifications
You must be signed in to change notification settings - Fork 25
/
Copy pathimagescale-q-m.py
executable file
·138 lines (117 loc) · 4.87 KB
/
imagescale-q-m.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#!/usr/bin/env python3
# Copyright © 2012-13 Qtrac Ltd. All rights reserved.
# This program or module is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version. It is provided for
# educational purposes and is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
import argparse
import collections
import math
import multiprocessing
import os
import sys
import Image
import Qtrac
Result = collections.namedtuple("Result", "copied scaled name")
Summary = collections.namedtuple("Summary", "todo copied scaled canceled")
def main():
size, smooth, source, target, concurrency = handle_commandline()
Qtrac.report("starting...")
summary = scale(size, smooth, source, target, concurrency)
summarize(summary, concurrency)
def handle_commandline():
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--concurrency", type=int,
default=multiprocessing.cpu_count(),
help="specify the concurrency (for debugging and "
"timing) [default: %(default)d]")
parser.add_argument("-s", "--size", default=400, type=int,
help="make a scaled image that fits the given dimension "
"[default: %(default)d]")
parser.add_argument("-S", "--smooth", action="store_true",
help="use smooth scaling (slow but good for text)")
parser.add_argument("source",
help="the directory containing the original .xpm images")
parser.add_argument("target",
help="the directory for the scaled .xpm images")
args = parser.parse_args()
source = os.path.abspath(args.source)
target = os.path.abspath(args.target)
if source == target:
args.error("source and target must be different")
if not os.path.exists(args.target):
os.makedirs(target)
return args.size, args.smooth, source, target, args.concurrency
def scale(size, smooth, source, target, concurrency):
canceled = False
jobs = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
create_processes(size, smooth, jobs, results, concurrency)
todo = add_jobs(source, target, jobs)
try:
jobs.join()
except KeyboardInterrupt: # May not work on Windows
Qtrac.report("canceling...")
canceled = True
copied = scaled = 0
while not results.empty(): # Safe because all jobs have finished
result = results.get_nowait()
copied += result.copied
scaled += result.scaled
return Summary(todo, copied, scaled, canceled)
def create_processes(size, smooth, jobs, results, concurrency):
for _ in range(concurrency):
process = multiprocessing.Process(target=worker, args=(size,
smooth, jobs, results))
process.daemon = True
process.start()
def worker(size, smooth, jobs, results):
while True:
try:
sourceImage, targetImage = jobs.get()
try:
result = scale_one(size, smooth, sourceImage, targetImage)
Qtrac.report("{} {}".format("copied" if result.copied else
"scaled", os.path.basename(result.name)))
results.put(result)
except Image.Error as err:
Qtrac.report(str(err), True)
finally:
jobs.task_done()
def add_jobs(source, target, jobs):
for todo, name in enumerate(os.listdir(source), start=1):
sourceImage = os.path.join(source, name)
targetImage = os.path.join(target, name)
jobs.put((sourceImage, targetImage))
return todo
def scale_one(size, smooth, sourceImage, targetImage):
oldImage = Image.from_file(sourceImage)
if oldImage.width <= size and oldImage.height <= size:
oldImage.save(targetImage)
return Result(1, 0, targetImage)
else:
if smooth:
scale = min(size / oldImage.width, size / oldImage.height)
newImage = oldImage.scale(scale)
else:
stride = int(math.ceil(max(oldImage.width / size,
oldImage.height / size)))
newImage = oldImage.subsample(stride)
newImage.save(targetImage)
return Result(0, 1, targetImage)
def summarize(summary, concurrency):
message = "copied {} scaled {} ".format(summary.copied, summary.scaled)
difference = summary.todo - (summary.copied + summary.scaled)
if difference:
message += "skipped {} ".format(difference)
message += "using {} processes".format(concurrency)
if summary.canceled:
message += " [canceled]"
Qtrac.report(message)
print()
if __name__ == "__main__":
main()