-
Notifications
You must be signed in to change notification settings - Fork 8
/
client.go
287 lines (251 loc) · 9.06 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
package grafsy
import (
"log"
"net"
"os"
"path"
"strings"
"sync"
"time"
)
// Client is a class wich sends metrics to the carbon receivers
type Client struct {
// User config.
Conf *Config
// Local config.
Lc *LocalConfig
// Pointer to Monitoring structure.
Mon *Monitoring
// Monitoring channels per carbon
monChannels map[string]chan string
// Main channel per carbon
mainChannels map[string]chan string
}
var chanLock sync.Mutex
// Create a directory for retry files
func (c Client) createRetryDir() error {
err := os.MkdirAll(c.Conf.RetryDir, 0750)
return err
}
// Save []string to file.
func (c Client) saveSliceToRetry(metrics []string, carbonAddr string) error {
// If size of file is bigger, than max size we will remove lines from this file,
// and will call this function again to check result and write to the file.
// Recursion:)
c.Lc.lg.Printf("Resaving %d metrics back to the retry-file", len(metrics))
retFile := path.Join(c.Conf.RetryDir, carbonAddr)
f, err := os.OpenFile(retFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600)
if err != nil {
c.Lc.lg.Println(err)
c.Mon.Increase(&c.Mon.clientStat[carbonAddr].dropped, len(metrics))
return err
}
defer f.Close()
dropped := 0
for _, metric := range metrics {
_, err = f.WriteString(metric + "\n")
if err != nil {
c.Lc.lg.Println(err)
dropped++
}
}
if dropped > 0 {
c.Mon.Increase(&c.Mon.clientStat[carbonAddr].dropped, dropped)
}
return c.removeOldDataFromRetryFile(carbonAddr)
}
// Save part of entire content of channel to file.
func (c Client) saveChannelToRetry(ch chan string, size int, carbonAddr string) {
// If size of file is bigger, than max size we will remove lines from this file,
// and will call this function again to check result and write to the file.
// Recursion:)
// We save all metrics from channels on the program ending
// In this case on the size=0 the whole channel is saved
if size == 0 {
size = len(ch)
}
c.Lc.lg.Printf("Saving %d metrics from channel to the retry-file", size)
retFile := path.Join(c.Conf.RetryDir, carbonAddr)
f, err := os.OpenFile(retFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600)
if err != nil {
c.Lc.lg.Println(err.Error())
}
defer f.Close()
dropped, saved := 0, 0
for i := 0; i < size; i++ {
_, err = f.WriteString(<-ch + "\n")
if err != nil {
dropped++
c.Lc.lg.Println(err.Error())
} else {
saved++
}
}
if dropped > 0 {
c.Mon.Increase(&c.Mon.clientStat[carbonAddr].dropped, dropped)
}
if saved > 0 {
c.Mon.Increase(&c.Mon.clientStat[carbonAddr].saved, saved)
}
c.removeOldDataFromRetryFile(carbonAddr)
}
// Cleaning up retry-file.
// Entire file is sorted to have newest metrics at the beginning.
func (c Client) removeOldDataFromRetryFile(carbonAddr string) error {
retFile := path.Join(c.Conf.RetryDir, carbonAddr)
currentLinesInFile := getSizeInLinesFromFile(retFile)
if currentLinesInFile > c.Lc.fileMetricSize {
c.Lc.lg.Printf("I can not save to %s more, than %d. I will have to drop the rest (%d)",
retFile, c.Lc.fileMetricSize, currentLinesInFile-c.Lc.fileMetricSize)
// We save first c.Lc.fileMetricSize of metrics (newest)
wholeFile, _ := readMetricsFromFile(retFile)
return c.saveSliceToRetry(wholeFile[:c.Lc.fileMetricSize], carbonAddr)
}
return nil
}
// Attempt to send metric to graphite server via connection
func (c *Client) tryToSendToGraphite(metric string, carbonAddr string, conn net.Conn) error {
// If at any point "HOSTNAME" was used instead of real hostname - replace it
metric = strings.Replace(metric, "HOSTNAME", c.Lc.hostname, -1)
_, err := conn.Write([]byte(metric + "\n"))
if err != nil {
c.Lc.lg.Println("Write to server failed:", err.Error())
return err
}
c.Mon.Increase(&c.Mon.clientStat[carbonAddr].sent, 1)
return nil
}
// Run go routine per carbon server to:
// 1) Send data from retryFile to a carbon
// 2) Send metrics from monitoring channel to a carbon
// 3) Send metrics from the main channel to carbon
//
// And save everything to the retryFile on any error
func (c Client) runBackend(carbonAddr string) {
retFile := path.Join(c.Conf.RetryDir, carbonAddr)
chanLock.Lock()
monChannel := c.monChannels[carbonAddr]
mainChannel := c.mainChannels[carbonAddr]
chanLock.Unlock()
// TODO: think about graceful shutdown with flush channels
for ; ; time.Sleep(time.Duration(c.Conf.ClientSendInterval) * time.Second) {
var connectionFailed bool
// Try to dial to Graphite server. If ClientSendInterval is 10 seconds - dial should be no longer than 1 second
conn, err := net.DialTimeout("tcp", carbonAddr, time.Duration(c.Conf.ConnectTimeout)*time.Second)
if err != nil {
c.Lc.lg.Println("Can not connect to graphite server: ", err.Error())
c.saveChannelToRetry(monChannel, len(monChannel), carbonAddr)
c.saveChannelToRetry(mainChannel, len(mainChannel), carbonAddr)
c.removeOldDataFromRetryFile(carbonAddr)
continue
}
// We set dead line for connection to write. It should be the rest of we have for client interval
err = conn.SetWriteDeadline(time.Now().Add(time.Duration(c.Conf.ClientSendInterval-c.Conf.ConnectTimeout-1) * time.Second))
if err != nil {
c.Lc.lg.Println("Can not set deadline for connection: ", err.Error())
connectionFailed = true
}
// We send retry file first, we have a risk to lose old data
// Metrics from retry file are counted as extra metrics per second to have a chance to send them
// Otherwise we would only save new incomming metrics and continuously lose part of buffer
if !connectionFailed {
retryFileMetrics, _ := readMetricsFromFile(retFile)
for numOfMetricFromFile, metricFromFile := range retryFileMetrics {
if numOfMetricFromFile < c.Lc.mainBufferSize {
err = c.tryToSendToGraphite(metricFromFile, carbonAddr, conn)
if err != nil {
c.Lc.lg.Printf("Error happened in the middle of writing retry metrics. Resaving %d metrics\n", len(retryFileMetrics[numOfMetricFromFile:]))
// If we failed to write a metric to graphite - something is wrong with connection
c.saveSliceToRetry(retryFileMetrics[numOfMetricFromFile:], carbonAddr)
connectionFailed = true
break
} else {
c.Mon.Increase(&c.Mon.clientStat[carbonAddr].fromRetry, 1)
}
} else {
c.Lc.lg.Printf("Can read only %d metrics from %s. Rest %d will be kept for the next run", numOfMetricFromFile, retFile, len(retryFileMetrics[numOfMetricFromFile:]))
c.saveSliceToRetry(retryFileMetrics[numOfMetricFromFile:], carbonAddr)
break
}
}
}
// Monitoring. We read it always and we reserved space for it
bufSize := len(monChannel)
if !connectionFailed {
for i := 0; i < bufSize; i++ {
err = c.tryToSendToGraphite(<-monChannel, carbonAddr, conn)
if err != nil {
c.Lc.lg.Println("Error happened in the middle of writing monitoring metrics. Saving...")
c.saveChannelToRetry(monChannel, bufSize-i, carbonAddr)
connectionFailed = true
break
}
}
} else {
c.saveChannelToRetry(monChannel, bufSize, carbonAddr)
}
// Main Buffer. We read it completely but send only part which fits in mainBufferSize
// Rests we save
bufSize = len(mainChannel)
if !connectionFailed {
for processedMainBuff := 0; processedMainBuff < bufSize; processedMainBuff = processedMainBuff + 1 {
metric := <-mainChannel
err = c.tryToSendToGraphite(metric, carbonAddr, conn)
if err != nil {
c.Lc.lg.Printf("Error happened in the middle of writing metrics. Saving %d metrics\n", bufSize-processedMainBuff)
c.saveChannelToRetry(mainChannel, bufSize-processedMainBuff, carbonAddr)
break
}
}
} else {
c.saveChannelToRetry(mainChannel, bufSize, carbonAddr)
}
conn.Close()
}
}
//Run a client, which:
// 1) Make monitoring and main channels per carbon server
// 2) Launchs go routine per carbon server
// 3) Copy metrics from monitoring and main channels to the carbon server specific
// Should be run in separate goroutine.
func (c Client) Run() {
err := c.createRetryDir()
if err != nil {
log.Fatal(err)
}
c.mainChannels = make(map[string]chan string)
c.monChannels = make(map[string]chan string)
for _, carbonAddr := range c.Conf.CarbonAddrs {
chanLock.Lock()
c.mainChannels[carbonAddr] = make(chan string, cap(c.Lc.mainChannel))
c.monChannels[carbonAddr] = make(chan string, cap(c.Lc.monitoringChannel))
chanLock.Unlock()
go c.runBackend(carbonAddr)
}
sup := supervisor(c.Conf.Supervisor)
for ; ; time.Sleep(time.Second) {
// Notify watchdog about aliveness of Client routine
sup.notify()
// write metrics from monitoring and main channels to the server specific channels
for i := 0; i < len(c.Lc.mainChannel); i++ {
metric := <-c.Lc.mainChannel
for _, carbonAddr := range c.Conf.CarbonAddrs {
select {
case c.mainChannels[carbonAddr] <- metric:
default:
c.Mon.Increase(&c.Mon.clientStat[carbonAddr].dropped, 1)
}
}
}
for i := 0; i < len(c.Lc.monitoringChannel); i++ {
metric := <-c.Lc.monitoringChannel
for _, carbonAddr := range c.Conf.CarbonAddrs {
select {
case c.monChannels[carbonAddr] <- metric:
default:
c.Mon.Increase(&c.Mon.clientStat[carbonAddr].dropped, 1)
}
}
}
}
}