-
Notifications
You must be signed in to change notification settings - Fork 8
/
DemoTextDataReplayProvider.java
160 lines (136 loc) · 5.33 KB
/
DemoTextDataReplayProvider.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package velox.api.layer0.replay;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import com.google.gson.Gson;
import velox.api.layer0.annotations.Layer0ReplayModule;
import velox.api.layer0.data.FileEndReachedUserMessage;
import velox.api.layer0.data.FileNotSupportedUserMessage;
import velox.api.layer0.data.ReadFileLoginData;
import velox.api.layer1.Layer1ApiListener;
import velox.api.layer1.annotations.Layer1ApiVersion;
import velox.api.layer1.annotations.Layer1ApiVersionValue;
import velox.api.layer1.data.InstrumentInfo;
import velox.api.layer1.data.LoginData;
import velox.api.layer1.data.TradeInfo;
import velox.api.layer1.reading.UserDataUserMessage;
/**
* Allows reading simple text format (that mimics {@link Layer1ApiListener}
* methods) to be replayed by Bookmap.
*/
@Layer1ApiVersion(Layer1ApiVersionValue.VERSION2)
@Layer0ReplayModule
public class DemoTextDataReplayProvider extends ExternalReaderBaseProvider {
private final Gson gson = new Gson();
private Thread readerThread;
private long currentTime = 0;
private boolean play = true;
private BufferedReader reader;
@Override
public void login(LoginData loginData) {
ReadFileLoginData fileData = (ReadFileLoginData) loginData;
try {
// For demo purposes let's just check the extension.
// Usually you will want to take a look at file content here to
// ensure it's expected file format
if (!fileData.file.getName().endsWith(".simpleformat.txt")) {
throw new IOException("File extension not supported");
} else {
reader = new BufferedReader(new FileReader(fileData.file));
// Reading one line to guarantee that when we exit this method
// getCurrentTime will return meaningful result.
// Alternative is to wait for first line to be read by
// readerThread
// Reading it here also allows a bit of extra validation, since
// in case of error it's still possible to report that file is
// not supported
readLine();
readerThread = new Thread(this::read);
readerThread.start();
}
} catch (@SuppressWarnings("unused") IOException e) {
adminListeners.forEach(listener -> listener.onUserMessage(new FileNotSupportedUserMessage()));
}
}
private void read() {
try {
while (!Thread.interrupted() && play) {
readLine();
}
} catch (@SuppressWarnings("unused") IOException e) {
reportFileEnd();
}
}
public void reportFileEnd() {
adminListeners.forEach(listener -> listener.onUserMessage(new FileEndReachedUserMessage()));
play = false;
}
private void readLine() throws IOException {
String line = reader.readLine();
if (line == null && play) {
reportFileEnd();
} else {
String[] tokens = line.split(";;;");
currentTime = Long.parseLong(tokens[0]);
String eventCode = tokens[1];
switch (eventCode) {
case "onInstrumentAdded": {
String alias = tokens[2];
InstrumentInfo instrumentInfo = gson.fromJson(tokens[3], InstrumentInfo.class);
instrumentListeners.forEach(
l -> l.onInstrumentAdded(alias, instrumentInfo));
break;
}
case "onTrade": {
String alias = tokens[2];
double price = Double.parseDouble(tokens[3]);
int size = Integer.parseInt(tokens[4]);
TradeInfo tradeInfo = gson.fromJson(tokens[5], TradeInfo.class);
dataListeners.forEach(
l -> l.onTrade(alias, price, size, tradeInfo));
break;
}
case "onDepth": {
String alias = tokens[2];
boolean isBid = Boolean.parseBoolean(tokens[3]);
int price = Integer.parseInt(tokens[4]);
int size = Integer.parseInt(tokens[5]);
dataListeners.forEach(
l -> l.onDepth(alias, isBid, price, size));
break;
}
case "onUserDataUserMessage": {
String tag = tokens[2];
String alias = tokens[3];
byte[] data = tokens[4].getBytes();
adminListeners.forEach(
l -> l.onUserMessage(
new UserDataUserMessage(tag, alias, data)
));
break;
}
default:
reportFileEnd();
throw new RuntimeException("Unknown event code " + eventCode);
}
}
}
@Override
public long getCurrentTime() {
return currentTime;
}
@Override
public String getSource() {
// String identifying where data came from.
// For example you can use that later in your indicator.
return "simple example data";
}
@Override
public void close() {
readerThread.interrupt();
try {
reader.close();
} catch (@SuppressWarnings("unused") IOException e) {
}
}
}