-
Notifications
You must be signed in to change notification settings - Fork 574
Expand file tree
/
Copy pathuploadthread.py
More file actions
69 lines (65 loc) · 2.43 KB
/
uploadthread.py
File metadata and controls
69 lines (65 loc) · 2.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
"""
`UploadThread` class definition
"""
import time
import random
import protocol
import state
import connectionpool
from randomtrackingdict import RandomTrackingDict
from network import dandelion_ins
from threads import StoppableThread
class UploadThread(StoppableThread):
"""
This is a thread that uploads the objects that the peers requested from me
"""
maxBufSize = 2097152 # 2MB
name = "Uploader"
def run(self):
while not self._stopped:
uploaded = 0
# Choose uploading peers randomly
connections = connectionpool.pool.establishedConnections()
random.shuffle(connections)
for i in connections:
now = time.time()
# avoid unnecessary delay
if i.skipUntil >= now:
continue
if len(i.write_buf) > self.maxBufSize:
continue
try:
request = i.pendingUpload.randomKeys(
RandomTrackingDict.maxPending)
except KeyError:
continue
payload = bytearray()
chunk_count = 0
for chunk in request:
del i.pendingUpload[chunk]
if dandelion_ins.hasHash(chunk) and \
i != dandelion_ins.objectChildStem(chunk):
i.antiIntersectionDelay()
self.logger.info(
'%s asked for a stem object we didn\'t offer to it.',
i.destination)
break
try:
payload.extend(protocol.CreatePacket(
'object', state.Inventory[chunk].payload))
chunk_count += 1
except KeyError:
i.antiIntersectionDelay()
self.logger.info(
'%s asked for an object we don\'t have.',
i.destination)
break
if not chunk_count:
continue
i.append_write_buf(payload)
self.logger.debug(
'%s:%i Uploading %i objects',
i.destination.host, i.destination.port, chunk_count)
uploaded += chunk_count
if not uploaded:
self.stop.wait(1)