-
Notifications
You must be signed in to change notification settings - Fork 574
Expand file tree
/
Copy pathcore.py
More file actions
436 lines (389 loc) · 16.3 KB
/
core.py
File metadata and controls
436 lines (389 loc) · 16.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
"""
Tests for core and those that do not work outside
(because of import error for example)
"""
import atexit
import os
import pickle # nosec
import Queue
import random # nosec
import shutil
import socket
import string
import sys
import threading
import time
import unittest
import protocol
import state
import helper_sent
import helper_addressbook
from bmconfigparser import config
from helper_msgcoding import MsgEncode, MsgDecode
from helper_sql import sqlQuery
from network import asyncore_pollchoose as asyncore, knownnodes
from network.bmproto import BMProto
from network.connectionpool import BMConnectionPool
from network.node import Node, Peer
from network.tcp import Socks4aBMConnection, Socks5BMConnection, TCPConnection
from queues import excQueue
from version import softwareVersion
from common import cleanup
try:
socket.socket().bind(('127.0.0.1', 9050))
tor_port_free = True
except (OSError, socket.error):
tor_port_free = False
frozen = getattr(sys, 'frozen', None)
knownnodes_file = os.path.join(state.appdata, 'knownnodes.dat')
def pickle_knownnodes():
"""Generate old style pickled knownnodes.dat"""
now = time.time()
with open(knownnodes_file, 'wb') as dst:
pickle.dump({
stream: {
Peer(
'%i.%i.%i.%i' % tuple([
random.randint(1, 255) for i in range(4)]),
8444): {'lastseen': now, 'rating': 0.1}
for i in range(1, 4) # 3 test nodes
}
for stream in range(1, 4) # 3 test streams
}, dst)
class TestCore(unittest.TestCase):
"""Test case, which runs in main pybitmessage thread"""
addr = 'BM-2cVvkzJuQDsQHLqxRXc6HZGPLZnkBLzEZY'
def tearDown(self):
"""Reset possible unexpected settings after test"""
knownnodes.addKnownNode(1, Peer('127.0.0.1', 8444), is_self=True)
config.remove_option('bitmessagesettings', 'dontconnect')
config.remove_option('bitmessagesettings', 'onionservicesonly')
config.set('bitmessagesettings', 'socksproxytype', 'none')
def test_msgcoding(self):
"""test encoding and decoding (originally from helper_msgcoding)"""
msg_data = {
'subject': ''.join(
random.choice(string.ascii_lowercase + string.digits) # nosec
for _ in range(40)),
'body': ''.join(
random.choice(string.ascii_lowercase + string.digits) # nosec
for _ in range(10000))
}
obj1 = MsgEncode(msg_data, 1)
obj2 = MsgEncode(msg_data, 2)
obj3 = MsgEncode(msg_data, 3)
# print "1: %i 2: %i 3: %i" % (
# len(obj1.data), len(obj2.data), len(obj3.data))
obj1e = MsgDecode(1, obj1.data)
# no subject in trivial encoding
self.assertEqual(msg_data['body'], obj1e.body)
obj2e = MsgDecode(2, obj2.data)
self.assertEqual(msg_data['subject'], obj2e.subject)
self.assertEqual(msg_data['body'], obj2e.body)
obj3e = MsgDecode(3, obj3.data)
self.assertEqual(msg_data['subject'], obj3e.subject)
self.assertEqual(msg_data['body'], obj3e.body)
try:
MsgEncode({'body': 'A msg with no subject'}, 3)
except Exception as e:
self.fail(
'Exception %s while trying to encode message'
' with no subject!' % e
)
@unittest.skip('Bad environment for asyncore.loop')
def test_tcpconnection(self):
"""initial fill script from network.tcp"""
config.set('bitmessagesettings', 'dontconnect', 'true')
try:
for peer in (Peer("127.0.0.1", 8448),):
direct = TCPConnection(peer)
while asyncore.socket_map:
print("loop, state = %s" % direct.state)
asyncore.loop(timeout=10, count=1)
except: # noqa:E722
self.fail('Exception in test loop')
def _load_knownnodes(self, filepath):
with knownnodes.knownNodesLock:
shutil.copyfile(filepath, knownnodes_file)
try:
knownnodes.readKnownNodes()
except AttributeError as e:
self.fail('Failed to load knownnodes: %s' % e)
@staticmethod
def _wipe_knownnodes():
with knownnodes.knownNodesLock:
knownnodes.knownNodes = {stream: {} for stream in range(1, 4)}
@staticmethod
def _outdate_knownnodes():
with knownnodes.knownNodesLock:
for nodes in knownnodes.knownNodes.itervalues():
for node in nodes.itervalues():
node['lastseen'] -= 2419205 # older than 28 days
def test_knownnodes_pickle(self):
"""ensure that 3 nodes was imported for each stream"""
pickle_knownnodes()
self._wipe_knownnodes()
knownnodes.readKnownNodes()
for nodes in knownnodes.knownNodes.itervalues():
self_count = n = 0
for n, node in enumerate(nodes.itervalues()):
if node.get('self'):
self_count += 1
self.assertEqual(n - self_count, 2)
def test_knownnodes_default(self):
"""test adding default knownnodes if nothing loaded"""
cleanup(files=('knownnodes.dat',))
self._wipe_knownnodes()
knownnodes.readKnownNodes()
self.assertGreaterEqual(
len(knownnodes.knownNodes[1]), len(knownnodes.DEFAULT_NODES))
def test_0_cleaner(self):
"""test knownnodes starvation leading to IndexError in Asyncore"""
self._outdate_knownnodes()
# time.sleep(303) # singleCleaner wakes up every 5 min
knownnodes.cleanupKnownNodes()
self.assertTrue(knownnodes.knownNodes[1])
while True:
try:
thread, exc = excQueue.get(block=False)
except Queue.Empty:
return
if thread == 'Asyncore' and isinstance(exc, IndexError):
self.fail("IndexError because of empty knownNodes!")
def _initiate_bootstrap(self):
config.set('bitmessagesettings', 'dontconnect', 'true')
self._wipe_knownnodes()
knownnodes.addKnownNode(1, Peer('127.0.0.1', 8444), is_self=True)
knownnodes.cleanupKnownNodes()
time.sleep(5)
def _check_connection(self, full=False):
"""
Check if there is at least one outbound connection to remote host
with name not starting with "bootstrap" in 6 minutes at most,
fail otherwise.
"""
_started = time.time()
config.remove_option('bitmessagesettings', 'dontconnect')
proxy_type = config.safeGet(
'bitmessagesettings', 'socksproxytype')
if proxy_type == 'SOCKS5':
connection_base = Socks5BMConnection
elif proxy_type == 'SOCKS4a':
connection_base = Socks4aBMConnection
else:
connection_base = TCPConnection
c = 360
while c > 0:
time.sleep(1)
c -= 2
for peer, con in BMConnectionPool().outboundConnections.iteritems():
if (
peer.host.startswith('bootstrap')
or peer.host == 'quzwelsuziwqgpt2.onion'
):
if c < 60:
self.fail(
'Still connected to bootstrap node %s after %.2f'
' seconds' % (peer, time.time() - _started))
c += 1
break
else:
self.assertIsInstance(con, connection_base)
self.assertNotEqual(peer.host, '127.0.0.1')
if full and not con.fullyEstablished:
continue
return
self.fail(
'Failed to connect during %.2f sec' % (time.time() - _started))
def _check_knownnodes(self):
for stream in knownnodes.knownNodes.itervalues():
for peer in stream:
if peer.host.startswith('bootstrap'):
self.fail(
'Bootstrap server in knownnodes: %s' % peer.host)
def test_dontconnect(self):
"""all connections are closed 5 seconds after setting dontconnect"""
self._initiate_bootstrap()
self.assertEqual(len(BMConnectionPool().connections()), 0)
def test_connection(self):
"""test connection to bootstrap servers"""
self._initiate_bootstrap()
for port in [8080, 8444]:
for item in socket.getaddrinfo(
'bootstrap%s.bitmessage.org' % port, 80):
try:
addr = item[4][0]
socket.inet_aton(item[4][0])
except (TypeError, socket.error):
continue
else:
knownnodes.addKnownNode(1, Peer(addr, port))
self._check_connection(True)
def test_bootstrap(self):
"""test bootstrapping"""
config.set('bitmessagesettings', 'socksproxytype', 'none')
self._initiate_bootstrap()
self._check_connection()
self._check_knownnodes()
# backup potentially enough knownnodes
knownnodes.saveKnownNodes()
with knownnodes.knownNodesLock:
shutil.copyfile(knownnodes_file, knownnodes_file + '.bak')
@unittest.skipIf(tor_port_free, 'no running tor detected')
def test_bootstrap_tor(self):
"""test bootstrapping with tor"""
config.set('bitmessagesettings', 'socksproxytype', 'SOCKS5')
self._initiate_bootstrap()
self._check_connection()
self._check_knownnodes()
@unittest.skipIf(tor_port_free, 'no running tor detected')
def test_onionservicesonly(self):
"""ensure bitmessage doesn't try to connect to non-onion nodes
if onionservicesonly set, wait at least 3 onion nodes
"""
config.set('bitmessagesettings', 'socksproxytype', 'SOCKS5')
config.set('bitmessagesettings', 'onionservicesonly', 'true')
self._load_knownnodes(knownnodes_file + '.bak')
if len([
node for node in knownnodes.knownNodes[1]
if node.host.endswith('.onion')
]) < 3: # generate fake onion nodes if have not enough
with knownnodes.knownNodesLock:
for f in ('a', 'b', 'c', 'd'):
knownnodes.addKnownNode(1, Peer(f * 16 + '.onion', 8444))
config.remove_option('bitmessagesettings', 'dontconnect')
tried_hosts = set()
for _ in range(360):
time.sleep(1)
for peer in BMConnectionPool().outboundConnections:
if peer.host.endswith('.onion'):
tried_hosts.add(peer.host)
else:
if not peer.host.startswith('bootstrap'):
self.fail(
'Found non onion hostname %s in outbound'
'connections!' % peer.host)
if len(tried_hosts) > 2:
return
self.fail('Failed to find at least 3 nodes to connect within 360 sec')
@unittest.skipIf(frozen, 'skip fragile test')
def test_udp(self):
"""check default udp setting and presence of Announcer thread"""
self.assertTrue(
config.safeGetBoolean('bitmessagesettings', 'udp'))
for thread in threading.enumerate():
if thread.name == 'Announcer': # find Announcer thread
break
else:
return self.fail('No Announcer thread found')
@staticmethod
def _decode_msg(data, pattern):
proto = BMProto()
proto.bm_proto_reset()
proto.payload = data[protocol.Header.size:]
return proto.decode_payload_content(pattern)
def test_version(self):
"""check encoding/decoding of the version message"""
# with single stream
msg = protocol.assembleVersionMessage('127.0.0.1', 8444, [1])
decoded = self._decode_msg(msg, "IQQiiQlsLv")
peer, _, ua, streams = self._decode_msg(msg, "IQQiiQlsLv")[4:]
self.assertEqual(
peer, Node(11 if state.dandelion else 3, '127.0.0.1', 8444))
self.assertEqual(ua, '/PyBitmessage:' + softwareVersion + '/')
self.assertEqual(streams, [1])
# with multiple streams
msg = protocol.assembleVersionMessage('127.0.0.1', 8444, [1, 2, 3])
decoded = self._decode_msg(msg, "IQQiiQlslv")
peer, _, ua = decoded[4:7]
streams = decoded[7:]
self.assertEqual(streams, [1, 2, 3])
def test_insert_method_msgid(self):
"""Test insert method of helper_sent module with message sending"""
fromAddress = 'BM-2cTrmD22fLRrumi3pPLg1ELJ6PdAaTRTdfg'
toAddress = 'BM-2cUGaEcGz9Zft1SPAo8FJtfzyADTpEgU9U'
message = 'test message'
subject = 'test subject'
result = helper_sent.insert(
toAddress=toAddress, fromAddress=fromAddress,
subject=subject, message=message
)
queryreturn = sqlQuery(
'''select msgid from sent where ackdata=?''', result)
self.assertNotEqual(queryreturn[0][0] if queryreturn else '', '')
column_type = sqlQuery(
'''select typeof(msgid) from sent where ackdata=?''', result)
self.assertEqual(column_type[0][0] if column_type else '', 'text')
@unittest.skipIf(frozen, 'not packed test_pattern into the bundle')
def test_old_knownnodes_pickle(self):
"""Testing old (v0.6.2) version knownnodes.dat file"""
try:
self._load_knownnodes(
os.path.join(
os.path.abspath(os.path.dirname(__file__)),
'test_pattern', 'knownnodes.dat'))
except self.failureException:
raise
finally:
cleanup(files=('knownnodes.dat',))
@staticmethod
def delete_address_from_addressbook(address):
"""Clean up addressbook"""
sqlQuery('''delete from addressbook where address=?''', address)
def test_add_same_address_twice_in_addressbook(self):
"""checking same address is added twice in addressbook"""
self.assertTrue(
helper_addressbook.insert(label='test1', address=self.addr))
self.assertFalse(
helper_addressbook.insert(label='test1', address=self.addr))
self.delete_address_from_addressbook(self.addr)
def test_is_address_present_in_addressbook(self):
"""checking is address added in addressbook or not"""
helper_addressbook.insert(label='test1', address=self.addr)
queryreturn = sqlQuery(
'select count(*) from addressbook where address=?', self.addr)
self.assertEqual(queryreturn[0][0], 1)
self.delete_address_from_addressbook(self.addr)
def test_adding_two_same_case_sensitive_addresses(self):
"""Testing same case sensitive address store in addressbook"""
address1 = 'BM-2cVWtdUzPwF7UNGDrZftWuHWiJ6xxBpiSP'
address2 = 'BM-2CvwTDuZpWf7ungdRzFTwUhwIj6XXbPIsp'
self.assertTrue(
helper_addressbook.insert(label='test1', address=address1))
self.assertTrue(
helper_addressbook.insert(label='test2', address=address2))
self.delete_address_from_addressbook(address1)
self.delete_address_from_addressbook(address2)
def run():
"""Starts all tests intended for core run"""
loader = unittest.defaultTestLoader
loader.sortTestMethodsUsing = None
suite = loader.loadTestsFromTestCase(TestCore)
if frozen:
try:
from pybitmessage import tests
suite.addTests(loader.loadTestsFromModule(tests))
except ImportError:
pass
try:
from pyelliptic import tests
suite.addTests(loader.loadTestsFromModule(tests))
except ImportError:
pass
try:
import bitmessageqt.tests
from xvfbwrapper import Xvfb
except ImportError:
Xvfb = None
else:
qt_tests = loader.loadTestsFromModule(bitmessageqt.tests)
suite.addTests(qt_tests)
def keep_exc(ex_cls, exc, tb): # pylint: disable=unused-argument
"""Own exception hook for test cases"""
excQueue.put(('tests', exc))
sys.excepthook = keep_exc
if Xvfb:
vdisplay = Xvfb(width=1024, height=768)
vdisplay.start()
atexit.register(vdisplay.stop)
return unittest.TextTestRunner(verbosity=2).run(suite)