forked from Rustam-Z/python-programming
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpool.py
More file actions
65 lines (52 loc) Β· 1.91 KB
/
Copy pathpool.py
File metadata and controls
65 lines (52 loc) Β· 1.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
"""
Stores a set of initialized objects kept ready to use.
This pattern is used when creating an object is costly (and they are
created frequently) but only a few are used at a time. With a Pool we
can manage those instances we have as of now by caching them. Now it
is possible to skip the costly creation of an object if one is
available in the pool.
A pool allows to 'check out' an inactive object and then to return it.
If none are available the pool creates one to provide without wait.
http://stackoverflow.com/questions/1514120/python-implementation-of-the-object-pool-design-pattern
https://sourcemaking.com/design_patterns/object_pool
"""
class ObjectPool:
def __init__(self, queue, auto_get=False):
self._queue = queue
self.item = self._queue.get() if auto_get else None
def __enter__(self):
if self.item is None:
self.item = self._queue.get()
return self.item
def __exit__(self, Type, value, traceback):
if self.item is not None:
self._queue.put(self.item)
self.item = None
def __del__(self):
if self.item is not None:
self._queue.put(self.item)
self.item = None
def main():
"""
>>> import queue
>>> def test_object(queue):
... pool = ObjectPool(queue, True)
... print('Inside func: {}'.format(pool.item))
>>> sample_queue = queue.Queue()
>>> sample_queue.put('yam')
>>> with ObjectPool(sample_queue) as obj:
... print('Inside with: {}'.format(obj))
Inside with: yam
>>> print('Outside with: {}'.format(sample_queue.get()))
Outside with: yam
>>> sample_queue.put('sam')
>>> test_object(sample_queue)
Inside func: sam
>>> print('Outside func: {}'.format(sample_queue.get()))
Outside func: sam
if not sample_queue.empty():
print(sample_queue.get())
"""
if __name__ == "__main__":
import doctest
doctest.testmod()