forked from whnet/ZhihuSpyder
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathproxyCore.py
More file actions
212 lines (179 loc) · 7.44 KB
/
proxyCore.py
File metadata and controls
212 lines (179 loc) · 7.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
from Proxy import validateData
from Proxy import fetchData
from Proxy import parseData
from Core.Logger import log
import time
import threading
import queue
import configparser
import logging
# 代理信息键值字段
PROXY_ID = 'id'
PROXY_IP = 'ip'
PROXY_PORT = 'port'
PROXY_ADDRESS = 'address'
PROXY_PROTOCOL = 'protocol'
PROXY_ALIVE_TIME = 'aliveTime'
PROXY_VALIDATE_TIME = 'validateTime'
# 代理网页检索起始页
FETCH_START_PAGE = 1
# 代理网页检索最大截至页
FETCH_END_PAGE = 5
# 代理池大小
PROXY_POOL_SIZE = 10
# 代理池扫描更新间隔
PROXY_POOL_SCAN_INTERVAL = 300
# 代理验证线程数
PROXY_VALIDATE_THREAD_NUM = 3
# 待验证的代理信息列表
unchecked_proxy_list = queue.LifoQueue(300)
# 可用的代理池
proxy_pool = queue.Queue(100)
# 标志量,是否正在扫描代理池
is_scanning = False
# 代理服务守护线程
class ProxyDaemonThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
# 初始化配置
self.init()
# 启动代理检验线程
validate_thread_list = []
for i in range(PROXY_VALIDATE_THREAD_NUM):
validate_thread = ProxyValidateThread()
validate_thread_list.append(validate_thread)
validate_thread.start()
if log.isEnabledFor(logging.DEBUG):
log.debug("代理验证线程启动")
# 启动代理池扫描线程
scan_thread = ProxyPoolScanThread()
scan_thread.start()
if log.isEnabledFor(logging.DEBUG):
log.debug("代理池扫描线程启动")
# 检查是否有线程出现异常并将其重启
while True:
# 检查代理验证线程
for thread in validate_thread_list:
if thread.status == 'error':
validate_thread_list.remove(thread)
thread = ProxyValidateThread()
validate_thread_list.append(thread)
thread.start()
if log.error(logging.ERROR):
log.error('代理验证线程重新启动')
# 检查代理池扫描线程
if scan_thread.status == 'error':
scan_thread = ProxyPoolScanThread()
scan_thread.start()
if log.isEnabledFor(logging.ERROR):
log.error("代理池扫描线程重新启动")
time.sleep(180)
# 初始化,读取配置文件并配置
@staticmethod
def init():
section = "proxy_core"
config = configparser.ConfigParser()
config.read('Proxy/Config/proxyConfiguration.conf', encoding='utf8')
# validateData配置
validateData.CONNECT_TIMEOUT = int(config.get(section, "proxyValidate_connectTimeout"))
validateData.NETWORK_RECONNECT_TIMES = int(config.get(section, "proxyValidate_networkReconnectTimes"))
# fetchData配置
fetchData.CONNECT_TIMEOUT = int(config.get(section, "dataFetch_connectTimeout"))
fetchData.NETWORK_RECONNECT_INTERVAL = int(config.get(section, "dataFetch_networkReconnectInterval"))
fetchData.NETWORK_RETRY_TIMES = int(config.get(section, "dataFetch_networkReconnectionTimes"))
# proxyCore配置
global FETCH_START_PAGE
global FETCH_END_PAGE
global PROXY_POOL_SIZE
global PROXY_POOL_SCAN_INTERVAL
global PROXY_VALIDATE_THREAD_NUM
FETCH_START_PAGE = int(config.get(section, "proxyCore_fetchStartPage"))
FETCH_END_PAGE = int(config.get(section, "proxyCore_fetchEndPage"))
PROXY_POOL_SIZE = int(config.get(section, "proxyCore_proxyPoolSize"))
PROXY_POOL_SCAN_INTERVAL = int(config.get(section, "proxyCore_proxyPoolScanInterval"))
PROXY_VALIDATE_THREAD_NUM = int(config.get(section, "proxyCore_proxyValidateThreadNum"))
# 代理检验线程
class ProxyValidateThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.status = 'running'
# 创建代理验证实例
self.dataValidateModule = validateData.DataValidateModule()
def run(self):
try:
while True:
# 若正在扫描代理池,则暂停
while is_scanning:
time.sleep(3)
if proxy_pool.qsize() < PROXY_POOL_SIZE and unchecked_proxy_list.qsize() > 0:
unchecked_proxy = unchecked_proxy_list.get()
is_available = self.dataValidateModule.validate_proxy_ip(unchecked_proxy)
if is_available is True:
proxy_pool.put(unchecked_proxy)
# print(unchecked_proxy)
time.sleep(1)
else:
time.sleep(5)
except Exception as e:
if log.isEnabledFor(logging.ERROR):
log.exception(e)
self.status = 'error'
# 代理池扫描线程,去除代理池中不可用的代理
class ProxyPoolScanThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.status = 'running'
self.current_page = 1
# 创建数据抓取模块
self.dataFetchModule = fetchData.DataFetchModule()
# 创建数据解析模块
self.dataParseModule = parseData.DataParseModule()
def run(self):
try:
while True:
if proxy_pool.qsize() < PROXY_POOL_SIZE and unchecked_proxy_list.qsize() < PROXY_POOL_SIZE:
self.fetch_and_parse_proxy()
elif proxy_pool.qsize() == PROXY_POOL_SIZE:
if log.isEnabledFor(logging.DEBUG):
log.debug('代理池更新')
self.scan_proxy_pool()
time.sleep(PROXY_POOL_SCAN_INTERVAL)
else:
time.sleep(60)
except Exception as e:
if log.isEnabledFor(logging.ERROR):
log.exception(e)
self.status = 'error'
# 扫描代理池中的代理
@staticmethod
def scan_proxy_pool():
# 由于待验证线程是先进后出队列,故对代理池进行扫描只需要将其添加到未检查列表,
# 由代理检验线程对其重新验证并加入回代理池
global is_scanning
is_scanning = True
while proxy_pool.qsize() > 0:
unchecked_proxy_list.put(proxy_pool.get())
is_scanning = False
# 爬取并解析代理
def fetch_and_parse_proxy(self):
if self.current_page > FETCH_END_PAGE:
self.current_page = FETCH_START_PAGE
response_data = self.dataFetchModule.fetch_proxy_data(self.current_page)
proxy_data = self.dataParseModule.parse_data(response_data)
self.current_page += 1
# 将解析到的代理添加到待验证的代理列表
for proxy in proxy_data:
unchecked_proxy_list.put(proxy)
class ProxyService:
def __init__(self):
self.proxy_daemon_thread = ProxyDaemonThread()
@staticmethod
def get_proxy():
return proxy_pool.get()
def start_proxy_service(self):
self.proxy_daemon_thread.start()
# if __name__ == '__main__':
# proxyService = ProxyService()
# while True:
# print(proxyService.get_proxy())