-
-
Notifications
You must be signed in to change notification settings - Fork 4.6k
Expand file tree
/
Copy pathConsolidateRegressionAlgorithm.py
More file actions
131 lines (102 loc) · 7.19 KB
/
ConsolidateRegressionAlgorithm.py
File metadata and controls
131 lines (102 loc) · 7.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from AlgorithmImports import *
from CustomDataRegressionAlgorithm import Bitcoin
### <summary>
### Regression algorithm reproducing data type bugs in the Consolidate API. Related to GH 4205.
### </summary>
class ConsolidateRegressionAlgorithm(QCAlgorithm):
# Initialise the data and resolution required, as well as the cash and start-end dates for your algorithm. All algorithms must initialized.
def initialize(self):
self.set_start_date(2020, 1, 5)
self.set_end_date(2020, 1, 20)
SP500 = Symbol.create(Futures.Indices.SP_500_E_MINI, SecurityType.FUTURE, Market.CME)
symbol = list(sorted(self.futures_chain(SP500).contracts.keys(), key=lambda symbol: symbol.id.date))[0]
self._future = self.add_future_contract(symbol)
tradable_dates_count = len(list(Time.each_tradeable_day_in_time_zone(self._future.exchange.hours,
self.start_date,
self.end_date,
self._future.exchange.time_zone,
False)))
self._expected_consolidation_counts = []
self.consolidate(symbol, Calendar.MONTHLY, lambda bar: self.update_monthly_consolidator(bar)) # shouldn't consolidate
self.consolidate(symbol, Calendar.WEEKLY, TickType.TRADE, lambda bar: self.update_weekly_consolidator(bar))
self.consolidate(symbol, Resolution.DAILY, lambda bar: self.update_trade_bar(bar, 0))
self._expected_consolidation_counts.append(tradable_dates_count)
self.consolidate(symbol, Resolution.DAILY, TickType.QUOTE, lambda bar: self.update_quote_bar(bar, 1))
self._expected_consolidation_counts.append(tradable_dates_count)
self.consolidate(symbol, timedelta(1), lambda bar: self.update_trade_bar(bar, 2))
self._expected_consolidation_counts.append(tradable_dates_count - 1)
self.consolidate(symbol, timedelta(1), TickType.QUOTE, lambda bar: self.update_quote_bar(bar, 3))
self._expected_consolidation_counts.append(tradable_dates_count - 1)
# sending None tick type
self.consolidate(symbol, timedelta(1), None, lambda bar: self.update_trade_bar(bar, 4))
self._expected_consolidation_counts.append(tradable_dates_count - 1)
self.consolidate(symbol, Resolution.DAILY, None, lambda bar: self.update_trade_bar(bar, 5))
self._expected_consolidation_counts.append(tradable_dates_count)
self._consolidation_counts = [0] * len(self._expected_consolidation_counts)
self._smas = [SimpleMovingAverage(10) for x in self._consolidation_counts]
self._last_sma_updates = [datetime.min for x in self._consolidation_counts]
self._monthly_consolidator_sma = SimpleMovingAverage(10)
self._monthly_consolidation_count = 0
self._weekly_consolidator_sma = SimpleMovingAverage(10)
self._weekly_consolidation_count = 0
self._last_weekly_sma_update = datetime.min
# custom data
self._custom_data_consolidator = 0
custom_symbol = self.add_data(Bitcoin, "BTC", Resolution.MINUTE).symbol
self.consolidate(custom_symbol, timedelta(1), lambda bar: self.increment_counter(1))
def increment_counter(self, id):
if id == 1:
self._custom_data_consolidator += 1
def update_trade_bar(self, bar, position):
self._smas[position].update(bar.end_time, bar.volume)
self._last_sma_updates[position] = bar.end_time
self._consolidation_counts[position] += 1
def update_quote_bar(self, bar, position):
self._smas[position].update(bar.end_time, bar.ask.high)
self._last_sma_updates[position] = bar.end_time
self._consolidation_counts[position] += 1
def update_monthly_consolidator(self, bar):
self._monthly_consolidator_sma.update(bar.end_time, bar.volume)
self._monthly_consolidation_count += 1
def update_weekly_consolidator(self, bar):
self._weekly_consolidator_sma.update(bar.end_time, bar.volume)
self._last_weekly_sma_update = bar.end_time
self._weekly_consolidation_count += 1
def on_end_of_algorithm(self):
for i, expected_consolidation_count in enumerate(self._expected_consolidation_counts):
consolidation_count = self._consolidation_counts[i]
if consolidation_count != expected_consolidation_count:
raise ValueError(f"Unexpected consolidation count for index {i}: expected {expected_consolidation_count} but was {consolidation_count}")
expected_weekly_consolidations = (self.end_date - self.start_date).days // 7
if self._weekly_consolidation_count != expected_weekly_consolidations:
raise ValueError(f"Expected {expected_weekly_consolidations} weekly consolidations but found {self._weekly_consolidation_count}")
if self._custom_data_consolidator == 0:
raise ValueError("Custom data consolidator did not consolidate any data")
for i, sma in enumerate(self._smas):
if sma.samples != self._expected_consolidation_counts[i]:
raise AssertionError(f"Expected {self._expected_consolidation_counts[i]} samples in each SMA but found {sma.samples} in SMA in index {i}")
last_update = self._last_sma_updates[i]
if sma.current.time != last_update:
raise AssertionError(f"Expected SMA in index {i} to have been last updated at {last_update} but was {sma.current.time}")
if self._monthly_consolidation_count != 0 or self._monthly_consolidator_sma.samples != 0:
raise AssertionError("Expected monthly consolidator to not have consolidated any data")
if self._weekly_consolidator_sma.samples != expected_weekly_consolidations:
raise AssertionError(f"Expected {expected_weekly_consolidations} samples in the weekly consolidator SMA but found {self._weekly_consolidator_sma.samples}")
if self._weekly_consolidator_sma.current.time != self._last_weekly_sma_update:
raise AssertionError(f"Expected weekly consolidator SMA to have been last updated at {self._last_weekly_sma_update} but was {self._weekly_consolidator_sma.current.time}")
# on_data event is the primary entry point for your algorithm. Each new data point will be pumped in here.
def on_data(self, data):
if not self.portfolio.invested and self._future.has_data:
self.set_holdings(self._future.symbol, 0.5)