1 import sys
2 import time
3 import threading
4 from itertools import izip, count
5
6 def foreach(f,l,threads=3,return_=False):
7 """
8 Apply f to each element of l, in parallel
9 """
10
11 if threads>1:
12 iteratorlock = threading.Lock()
13 exceptions = []
14 if return_:
15 n = 0
16 d = {}
17 i = izip(count(),l.__iter__())
18 else:
19 i = l.__iter__()
20
21
22 def runall():
23 while True:
24 iteratorlock.acquire()
25 try:
26 try:
27 if exceptions:
28 return
29 v = i.next()
30 finally:
31 iteratorlock.release()
32 except StopIteration:
33 return
34 try:
35 if return_:
36 n,x = v
37 d[n] = f(x)
38 else:
39 f(v)
40 except:
41 e = sys.exc_info()
42 iteratorlock.acquire()
43 try:
44 exceptions.append(e)
45 finally:
46 iteratorlock.release()
47
48 threadlist = [threading.Thread(target=runall) for j in xrange(threads)]
49 for t in threadlist:
50 t.start()
51 for t in threadlist:
52 t.join()
53 if exceptions:
54 a, b, c = exceptions[0]
55 raise a, b, c
56 if return_:
57 r = d.items()
58 r.sort()
59 return [v for (n,v) in r]
60 else:
61 if return_:
62 return [f(v) for v in l]
63 else:
64 for v in l:
65 f(v)
66 return
67
68 def parallel_map(f,l,threads=3):
69 return foreach(f,l,threads=threads,return_=True)
70
71 if __name__=='__main__':
72 def f(x):
73 print x
74 time.sleep(0.5)
75 foreach(f,range(10))
76 def g(x):
77 time.sleep(0.5)
78 print x
79 raise ValueError, x
80 time.sleep(0.5)
81 foreach(g,range(10))
, as shown below in the list of files. Do
link, since this is subject to change and can break easily.