-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathverify_dxfeed_data.py
More file actions
295 lines (240 loc) · 9.86 KB
/
verify_dxfeed_data.py
File metadata and controls
295 lines (240 loc) · 9.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
#!/usr/bin/env python3
"""
One-time script to verify DXFeed candle data against Redis
"""
import asyncio
import json
import websockets
import redis
import os
from datetime import datetime, timezone
from zoneinfo import ZoneInfo
# TastyTrade API credentials
TASTYTRADE_API_URL = "https://api.tastyworks.com"
DXFEED_WS_URL = "wss://tasty-openapi-ws.dxfeed.com/realtime"
async def get_access_token():
"""Get TastyTrade access token from tokens file"""
token_file = '/home/ubuntu/trader/backend/tokens/tastytrade_tokens.json'
if not os.path.exists(token_file):
raise Exception(f"Tokens file not found: {token_file}")
with open(token_file, 'r') as f:
tokens = json.load(f)
access_token = tokens.get('access_token')
if not access_token:
raise Exception("No access_token found in tokens file")
print(f"✓ Loaded access token from {token_file}")
return access_token
async def query_dxfeed_candles():
"""Query DXFeed for specific 1-minute candles"""
print("Getting TastyTrade access token...")
token = await get_access_token()
print(f"Connecting to DXFeed WebSocket: {DXFEED_WS_URL}")
async with websockets.connect(DXFEED_WS_URL) as ws:
# Wait for SETUP
setup_msg = await ws.recv()
setup_data = json.loads(setup_msg)
print(f"SETUP: {setup_data}")
# Authenticate
auth_msg = {
"type": "AUTH",
"channel": 0,
"token": token
}
await ws.send(json.dumps(auth_msg))
# Wait for AUTH_STATE
while True:
msg = await ws.recv()
data = json.loads(msg)
if data.get('type') == 'AUTH_STATE':
print(f"AUTH_STATE: {data.get('state')}")
if data.get('state') == 'AUTHORIZED':
break
# Open channel
channel_msg = {
"type": "CHANNEL_REQUEST",
"channel": 1,
"service": "FEED",
"parameters": {"contract": "AUTO"}
}
await ws.send(json.dumps(channel_msg))
# Wait for CHANNEL_OPENED
while True:
msg = await ws.recv()
data = json.loads(msg)
if data.get('type') == 'CHANNEL_OPENED':
print(f"CHANNEL_OPENED: {data.get('channel')}")
break
# Setup feed
feed_setup_msg = {
"type": "FEED_SETUP",
"channel": 1,
"acceptAggregationPeriod": 0.1,
"acceptDataFormat": "COMPACT",
"acceptEventFields": {
"Candle": [
"eventType", "eventSymbol", "eventTime",
"time", "sequence", "count",
"open", "high", "low", "close", "volume"
]
}
}
await ws.send(json.dumps(feed_setup_msg))
# Wait for FEED_CONFIG
while True:
msg = await ws.recv()
data = json.loads(msg)
if data.get('type') == 'FEED_CONFIG':
print(f"FEED_CONFIG received")
break
# Subscribe to 1-minute candles with fromTime
# Nov 3, 2025 3:02 AM UTC = 1762128120000 ms
# Nov 3, 2025 3:04 AM UTC = 1762128240000 ms
# Request from 3:02 AM (will get 3:02, 3:03, 3:04)
from_time_ms = 1762128120000 # Nov 3, 2025 3:02:00 AM UTC
print(f"\nRequesting candles from {from_time_ms} ({datetime.fromtimestamp(from_time_ms/1000, tz=timezone.utc)})")
subscription_msg = {
"type": "FEED_SUBSCRIPTION",
"channel": 1,
"reset": False,
"add": [
{
"type": "Candle",
"symbol": "/ESZ25:XCME{=1m}",
"fromTime": from_time_ms
}
]
}
await ws.send(json.dumps(subscription_msg))
print("\nWaiting for candle data...")
candles = []
timeout_count = 0
# Collect candles for 10 seconds
while timeout_count < 20:
try:
msg = await asyncio.wait_for(ws.recv(), timeout=0.5)
data = json.loads(msg)
if data.get('type') == 'FEED_DATA':
feed_data = data.get('data', [])
for item in feed_data:
if isinstance(item, list) and item[0] == 'Candle':
# Parse candle data
event_symbol = item[1]
time_ms = item[3]
open_price = item[6]
high_price = item[7]
low_price = item[8]
close_price = item[9]
volume = item[10]
timestamp = datetime.fromtimestamp(time_ms / 1000, tz=timezone.utc)
# Filter for our target times
if time_ms in [1762128120000, 1762128180000, 1762128240000]:
candles.append({
'time_ms': time_ms,
'timestamp': timestamp,
'open': open_price,
'high': high_price,
'low': low_price,
'close': close_price,
'volume': volume
})
print(f" ✓ Got candle: {timestamp.isoformat()} O={open_price} H={high_price} L={low_price} C={close_price} V={volume}")
# Stop once we have all 3 candles
if len(candles) >= 3:
print("\n✓ Collected all 3 candles from DXFeed")
return candles
except asyncio.TimeoutError:
timeout_count += 1
if len(candles) > 0:
print(f" Waiting... (collected {len(candles)}/3 candles)")
print(f"\n✓ Collected {len(candles)} candles from DXFeed")
return candles
def query_redis_candles():
"""Query Redis for the same candles"""
print("\n" + "="*80)
print("Querying Redis for /ES 1-minute bars...")
print("="*80)
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# Query the bars stream for /ES:1
stream_key = "bars_stream:/ES:1"
# Get all entries and filter for our target times
# Nov 3, 2025 3:02 AM UTC = 2025-11-03T03:02:00
# Nov 3, 2025 3:03 AM UTC = 2025-11-03T03:03:00
# Nov 3, 2025 3:04 AM UTC = 2025-11-03T03:04:00
target_times = [
"2025-11-03T03:02:00+00:00",
"2025-11-03T03:03:00+00:00",
"2025-11-03T03:04:00+00:00"
]
redis_candles = []
# Query recent entries (get last 100 to ensure we have enough history)
entries = r.xrevrange(stream_key, '+', '-', count=200)
print(f"Found {len(entries)} total entries in Redis stream")
for entry_id, fields in entries:
timestamp = fields.get('timestamp', '')
if timestamp in target_times:
candle = {
'timestamp': timestamp,
'open': float(fields.get('open', 0)),
'high': float(fields.get('high', 0)),
'low': float(fields.get('low', 0)),
'close': float(fields.get('close', 0)),
'volume': int(fields.get('volume', 0))
}
redis_candles.append(candle)
print(f" ✓ Found in Redis: {timestamp} O={candle['open']} H={candle['high']} L={candle['low']} C={candle['close']} V={candle['volume']}")
# Sort by timestamp
redis_candles.sort(key=lambda x: x['timestamp'])
print(f"\n✓ Found {len(redis_candles)} matching candles in Redis")
return redis_candles
def compare_candles(dxfeed_candles, redis_candles):
"""Compare DXFeed and Redis candles"""
print("\n" + "="*80)
print("COMPARISON RESULTS")
print("="*80)
if len(dxfeed_candles) != len(redis_candles):
print(f"⚠️ WARNING: Different number of candles!")
print(f" DXFeed: {len(dxfeed_candles)} candles")
print(f" Redis: {len(redis_candles)} candles")
# Sort DXFeed candles by timestamp
dxfeed_candles.sort(key=lambda x: x['time_ms'])
print(f"\nComparing {min(len(dxfeed_candles), len(redis_candles))} candles...\n")
all_match = True
for i in range(min(len(dxfeed_candles), len(redis_candles))):
dxfeed = dxfeed_candles[i]
redis_candle = redis_candles[i]
dxfeed_time = dxfeed['timestamp'].isoformat()
redis_time = redis_candle['timestamp']
print(f"Candle #{i+1}: {dxfeed_time}")
print(f" {'Field':<10} {'DXFeed':<15} {'Redis':<15} {'Match':<10}")
print(f" {'-'*10} {'-'*15} {'-'*15} {'-'*10}")
# Compare each field
fields = ['open', 'high', 'low', 'close', 'volume']
for field in fields:
dxfeed_val = dxfeed[field]
redis_val = redis_candle[field]
match = "✓ MATCH" if dxfeed_val == redis_val else "✗ MISMATCH"
if dxfeed_val != redis_val:
all_match = False
print(f" {field:<10} {str(dxfeed_val):<15} {str(redis_val):<15} {match:<10}")
print()
if all_match:
print("="*80)
print("✅ ALL CANDLES MATCH PERFECTLY!")
print("="*80)
else:
print("="*80)
print("❌ SOME MISMATCHES FOUND")
print("="*80)
async def main():
print("="*80)
print("DXFeed vs Redis Data Verification")
print("Comparing /ES 1-minute candles for Nov 3, 2025 3:02-3:04 AM UTC")
print("="*80)
# Query DXFeed
dxfeed_candles = await query_dxfeed_candles()
# Query Redis
redis_candles = query_redis_candles()
# Compare
compare_candles(dxfeed_candles, redis_candles)
if __name__ == "__main__":
asyncio.run(main())