-
Notifications
You must be signed in to change notification settings - Fork 5
/
connect.py
342 lines (309 loc) · 12.2 KB
/
connect.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
import threading
import time
from datetime import datetime, timedelta, timezone
from time import sleep
import algo.init as algo
import bots.init as bots
import common.init as common
import functions
import services as service
from api.api import WS, Markets
from api.bitmex.ws import Bitmex
from api.bybit.ws import Bybit
from api.deribit.ws import Deribit
from bots.variables import Variables as bot
from common.variables import Variables as var
from display.functions import info_display
from display.variables import TreeTable
from display.variables import Variables as disp
from functions import Function
disp.root.bind("<F3>", lambda event: terminal_reload(event))
disp.root.bind("<F9>", lambda event: trade_state(event))
Bitmex.transaction = Function.transaction
Bybit.transaction = Function.transaction
Deribit.transaction = Function.transaction
disp.label_f9.config(bg=disp.red_color)
def setup(reload=False):
"""
This function works the first time you start the program or when you
reboot after pressing F3. Markets are loaded using setup_market() in
parallel in threads to speed up the loading process.
"""
clear_params()
common.setup_database_connecion()
var.robots_thread_is_active = False
threads = []
for name in var.market_list:
t = threading.Thread(target=setup_market, args=(Markets[name], reload))
threads.append(t)
t.start()
[thread.join() for thread in threads]
for name in var.market_list:
finish_setup(Markets[name])
merge_orders()
functions.init_tables()
var.robots_thread_is_active = True
thread = threading.Thread(target=robots_thread)
thread.start()
def setup_market(ws: Markets, reload=False):
"""
Market reboot. During program operation, when accessing endpoints or
receiving information from websockets, errors may occur due to the loss of
the Internet connection or errors for other reasons. If the program
detects such a case, it reboots the market to restore data integrity.
The download process may take time, because there are a large number
of calls to endpoints and websocket subscriptions. To speed up, many calls
are performed in parallel threads, within which parallel threads can also
be opened. If any download component is not received, the program will
restart again from the very beginning.
The download process is done in stages because the order in which the
information is received matters. Loading sequence:
1) All active instruments.
2) All active orders. After receiving orders, it may happen that the order
is executed even before the websocket comes up. In this case, the
websocket will not send execution, but the integrity of the information
will not be lost, because execution of order will be processed at the
end of loading in the load_trading_history() function.
3) Simultaneous download:
1. Subscribe to websockets only for those instruments that are
specified in the .env files.
2. Getting the user id.
3. Obtaining information on account balances.
4. Obtaining initial information about the positions of signed
instruments.
4) Reading active bots from the database.
5) Simultaneous download:
1. Receiving klines only for those instruments and timeframes that are
used by bots.
2. Trading history.
"""
def get_timeframes(ws, success, num):
if bots.Init.init_timeframes(ws):
success[num] = "success"
def get_history(ws, success, num):
if common.Init.load_trading_history(ws):
success[num] = "success"
ws.logNumFatal = -1
ws.api_is_active = False
if reload:
WS.exit(ws)
sleep(3)
while ws.logNumFatal:
var.queue_order.put({"action": "clear", "market": ws.name})
ws.logNumFatal = WS.start_ws(ws)
if ws.logNumFatal:
WS.exit(ws)
sleep(2)
else:
common.Init.clear_params(ws)
ws.logNumFatal = bots.Init.load_robots(ws)
if not ws.logNumFatal:
algo.init_algo(ws)
threads, success = [], []
success.append(None)
t = threading.Thread(
target=get_timeframes,
args=(ws, success, len(success) - 1),
)
threads.append(t)
t.start()
success.append(None)
t = threading.Thread(
target=get_history,
args=(ws, success, len(success) - 1),
)
threads.append(t)
t.start()
[thread.join() for thread in threads]
for s in success:
if not s:
var.logger.error(
ws.name + ": The kline data or trade history is not loaded."
)
ws.logNumFatal = -1
else:
var.logger.info("No robots loaded.")
sleep(2)
if ws.logNumFatal:
var.logger.info("\n\n")
var.logger.info(
"Something went wrong. "
+ ws.name
+ " is not loading. See logFile.log. Reboot.\n\n"
)
WS.exit(ws)
sleep(3)
def merge_orders():
orders_list = list()
for name in var.market_list:
orders_list += Markets[name].orders.values()
orders_list.sort(key=lambda x: x["transactTime"])
for order in orders_list:
var.queue_order.put({"action": "put", "order": order})
def finish_setup(ws: Markets):
"""
This part of the setup does not interact with HTTP, so there is no need to
load data from different threads to speed up the program and this function
is executed from the main loop. Moreover, the function uses
load_database() to fill data into the Treeview tables, which, according to
Tkinter capabilities, is only possible from the main loop.
"""
common.Init.load_database(ws)
common.Init.account_balances(ws)
common.Init.load_orders(ws, ws.setup_orders)
bots.Init.delete_unused_robot(ws)
for emi, value in ws.robot_status.items():
if emi in ws.robots:
ws.robots[emi]["STATUS"] = value
ws.message_time = datetime.now(tz=timezone.utc)
ws.api_is_active = True
def reload_market(ws: Markets):
ws.api_is_active = False
Function.market_status(
ws, status="RELOADING...", message="Reloading...", error=True
)
TreeTable.market.tree.update()
setup_market(ws=ws, reload=True)
var.queue_reload.put(ws)
def refresh() -> None:
while not var.queue_reload.empty():
ws = var.queue_reload.get()
finish_setup(ws=ws)
merge_orders()
Function.market_status(ws, status="ONLINE", message="", error=False)
functions.clear_tables()
while not var.queue_info.empty():
info = var.queue_info.get()
info_display(
name=info["market"],
message=info["message"],
tm=info["time"],
warning=info["warning"],
)
while not var.queue_order.empty():
"""
The queue thread-safely displays current orders that can be queued:
1. From the websockets of the markets.
2. When retrieving current orders from the endpoints when loading or
reloading the market.
3. When processing the trading history data.
Possible queue jobs:
1. "action": "put"
Display a row with the new order in the table. If an order with the
same clOrdID already exists, then first remove it from the table
and print the order on the first line.
2. "action": "delete"
Delete order by clOrdID.
3. "action": "clear"
Before reloading the market, delete all orders of a particular
market from the table, because the reboot process will update
information about current orders, so possibly canceled orders
during the reloading will be removed.
"""
job = var.queue_order.get()
if job["action"] == "delete":
clOrdID = job["clOrdID"]
if clOrdID in TreeTable.orders.children:
TreeTable.orders.delete(iid=clOrdID)
elif job["action"] == "put":
order = job["order"]
clOrdID = order["clOrdID"]
ws = Markets[order["MARKET"]]
if clOrdID in ws.orders:
Function.orders_display(ws, val=order)
elif job["action"] == "clear":
TreeTable.orders.clear_all(market=job["market"])
for name in var.market_list:
ws = Markets[name]
utc = datetime.now(tz=timezone.utc)
if ws.logNumFatal == 0:
if ws.api_is_active:
if utc > ws.message_time + timedelta(seconds=10):
if not WS.ping_pong(ws):
info_display(
ws.name,
"The websocket does not respond within 10 sec. Reboot",
warning=True,
)
ws.logNumFatal = 1001 # reloading
ws.message_time = utc
elif ws.logNumFatal > 0:
if ws.logNumFatal > 2000:
if ws.message2000 == "":
ws.message2000 = (
"Fatal error=" + str(ws.logNumFatal) + ". Market is frozen"
)
Function.market_status(
ws, status="Error", message=ws.message2000, error=True
)
sleep(1)
elif ws.logNumFatal >= 1000 or ws.timeoutOccurred != "": # reload
if ws.api_is_active:
t = threading.Thread(target=reload_market, args=(ws,))
t.start()
else:
if ws.logNumFatal > 0 and ws.logNumFatal <= 10:
if ws.messageStopped == "":
ws.messageStopped = (
"Error=" + str(ws.logNumFatal) + ". Trading stopped"
)
info_display(name, ws.messageStopped)
if ws.logNumFatal == 2:
info_display(name, "Insufficient available balance!")
disp.f9 = "OFF"
disp.label_f9.config(bg=disp.red_color)
ws.logNumFatal = 0
var.lock_market_switch.acquire(True)
ws = Markets[var.current_market]
if ws.api_is_active:
Function.refresh_on_screen(Markets[var.current_market], utc=utc)
var.lock_market_switch.release()
def clear_params():
var.symbol = var.env[var.current_market]["SYMBOLS"][0]
def robots_thread() -> None:
def bot_in_thread():
# Bots entry point
bot.robo[robot["emi"]](
robot=robot["robot"],
frame=robot["frame"],
instrument=robot["instrument"],
)
while var.robots_thread_is_active:
utcnow = datetime.now(tz=timezone.utc)
bot_list = list()
for market in var.market_list:
ws = Markets[market]
if ws.api_is_active:
if ws.frames:
bot_list = Function.robots_entry(ws, bot_list, utc=utcnow)
threads = []
for robot in bot_list:
t = threading.Thread(target=bot_in_thread)
threads.append(t)
t.start()
[thread.join() for thread in threads]
rest = 1 - time.time() % 1
time.sleep(rest)
def terminal_reload(event) -> None:
var.robots_thread_is_active = ""
functions.info_display("Tmatic", "Restarting...")
service.close(Markets)
disp.root.update()
setup()
functions.clear_tables()
def trade_state(event) -> None:
if disp.f9 == "ON":
disp.f9 = "OFF"
disp.label_f9.config(bg=disp.red_color)
elif disp.f9 == "OFF":
disp.f9 = "ON"
disp.label_f9.config(bg=disp.green_color)
for market in var.market_list:
Markets[market].logNumFatal = 0
print(market, disp.f9)
disp.label_f9["text"] = disp.f9
def on_closing(root, refresh_var):
root.after_cancel(refresh_var)
root.destroy()
service.close(Markets)
# os.abort()