-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathtest_corrected_websocket.py
More file actions
91 lines (73 loc) · 3.3 KB
/
test_corrected_websocket.py
File metadata and controls
91 lines (73 loc) · 3.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
"""
Test the corrected WebSocket connection using exact curl parameters
"""
import asyncio
import os
import dotenv
from BinomoAPI import BinomoAPI
dotenv.load_dotenv()
async def test_corrected_websocket():
"""Test WebSocket connection with corrected parameters"""
print("🧪 Testing Corrected WebSocket Connection")
print("=" * 60)
# Step 1: Login to get real credentials
print("🔐 Step 1: Login and get real credentials...")
login_response = BinomoAPI.login(os.getenv("email"), os.getenv("password"))
print(f"✅ Login successful! Token: {login_response.authtoken[:20]}...")
print(f"✅ Device ID: {login_response.user_id}")
# Step 2: Create API with corrected WebSocket
print("\n🔧 Step 2: Creating API with corrected WebSocket...")
api = BinomoAPI.create_from_login(
login_response=login_response,
device_id=login_response.user_id,
demo=True
)
print("✅ API created with corrected WebSocket client")
# Step 3: Test WebSocket connection
print("\n🌐 Step 3: Testing corrected WebSocket connection...")
try:
connected = await api.connect()
if connected:
print("🎉 SUCCESS! WebSocket connected with corrected parameters!")
print("✅ The curl parameters fixed the connection issue!")
# Test a simple WebSocket operation
print("\n📈 Step 4: Testing WebSocket trading...")
try:
# Test place call option
call_result = await api.place_call_option("EUR/USD", 1.0, 60)
print(f"✅ CALL option placed successfully: {call_result}")
# Test place put option
put_result = await api.place_put_option("EUR/USD", 1.0, 60)
print(f"✅ PUT option placed successfully: {put_result}")
print("\n🎯 COMPLETE SUCCESS!")
print("✅ WebSocket connection working")
print("✅ Trading functions working")
print("✅ API fully functional!")
except Exception as trade_error:
print(f"⚠️ Trading error (connection OK): {trade_error}")
print(" WebSocket connected but trading needs refinement")
else:
print("❌ WebSocket connection still failed")
print(" May need additional authentication steps")
except Exception as e:
print(f"❌ WebSocket connection error: {e}")
print("🔍 Check if additional authentication is needed")
# Show what we can test that still works
print("\n📊 Testing non-WebSocket functions...")
# Test balance
try:
balance = await api.get_balance()
print(f"✅ Balance: ${balance.amount}")
except Exception as be:
print(f"⚠️ Balance error: {be}")
# Test assets
try:
assets = api.get_available_assets()
print(f"✅ Assets: {len(assets)} available")
except Exception as ae:
print(f"⚠️ Assets error: {ae}")
# Cleanup
await api.close()
print("\n✅ Test completed")
if __name__ == "__main__":
asyncio.run(test_corrected_websocket())