Skip to content

Commit

Permalink
Merge pull request #201 from negbie/master
Browse files Browse the repository at this point in the history
Allow webconfig
  • Loading branch information
negbie committed Mar 12, 2019
2 parents 41bda7a + 417076a commit f60f1be
Show file tree
Hide file tree
Showing 18 changed files with 358 additions and 298 deletions.
121 changes: 94 additions & 27 deletions cmd/heplify-server/heplify-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@ package main

import (
"fmt"
"html/template"
"io/ioutil"
"net/http"
"os"
"os/signal"
"runtime/debug"
"strings"
"sync"
"syscall"

//"net"
//_ "net/http/pprof"

"github.com/koding/multiconfig"
"github.com/negbie/heplify-server/config"
input "github.com/negbie/heplify-server/server"
"github.com/negbie/logp"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

type server interface {
Expand Down Expand Up @@ -70,38 +70,105 @@ func tomlExists(f string) bool {
}

func main() {
if config.Setting.Version {
fmt.Println(config.Version)
os.Exit(0)
}
var servers []server
var wg sync.WaitGroup
var sigCh = make(chan os.Signal, 1)

//go http.ListenAndServe(":8181", http.DefaultServeMux)
debug.SetGCPercent(50)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
hep := input.NewHEPInput()
servers := []server{hep}

/* autopprof.Capture(autopprof.CPUProfile{
Duration: 15 * time.Second,
}) */

for _, srv := range servers {
wg.Add(1)
go func(s server) {
defer wg.Done()
s.Run()
}(srv)
startServer := func() {
hep := input.NewHEPInput()
servers = []server{hep}
for _, srv := range servers {
wg.Add(1)
go func(s server) {
defer wg.Done()
s.Run()
}(srv)
}
}
endServer := func() {
for _, srv := range servers {
wg.Add(1)
go func(s server) {
defer wg.Done()
s.End()
}(srv)
}
wg.Wait()
}

<-sigCh
if len(config.Setting.ConfigHTTPAddr) > 2 {
cfgFile := config.Setting.Config
tmpl := template.Must(template.New("main").Parse(configForm))
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
tmpl.Execute(w, nil)
return
}

ioutil.WriteFile(cfgFile, []byte(r.FormValue("config")), 0644)
cf := multiconfig.NewWithPath(cfgFile)
cfg := new(config.HeplifyServer)
err := cf.Load(cfg)
if err != nil {
logp.Warn("Failed config reload from %v. %v", r.RemoteAddr, err)
tmpl.Execute(w, struct {
Success bool
Err error
}{false, err})
} else {
logp.Info("Successfull config reloaded from %v", r.RemoteAddr)
endServer()
config.Setting = *cfg
startServer()
tmpl.Execute(w, struct {
Success bool
Err error
}{true, nil})
}
})

go http.ListenAndServe(config.Setting.ConfigHTTPAddr, nil)
}

for _, srv := range servers {
wg.Add(1)
go func(s server) {
defer wg.Done()
s.End()
}(srv)
if promAddr := config.Setting.PromAddr; len(promAddr) > 2 {
go func() {
http.Handle("/metrics", promhttp.Handler())
err := http.ListenAndServe(promAddr, nil)
if err != nil {
logp.Err("%v", err)
}
}()
}
wg.Wait()

startServer()
<-sigCh
endServer()
}

var configForm = `
<!DOCTYPE html>
<html>
<head>
<title>heplify-server web config</title>
</head>
<body>
<h2>heplify-server.toml</h2>
{{if .Success}}
<h4>Success!</h4>
{{end}}
{{if not .Success}}
<h4>{{.Err}}</h4>
{{end}}
<form method="POST">
<label>Config:</label><br />
<textarea type="text" name="config" style="font-family: Arial;font-size: 10pt;width:80%;height:25vw"></textarea><br />
<input type="submit" class="like" value="Apply config" />
</form>
</body>
</html>
`
6 changes: 3 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package config

const Version = "heplify-server 1.06"
const Version = "heplify-server 1.07"

var Setting HeplifyServer

Expand Down Expand Up @@ -52,7 +52,7 @@ type HeplifyServer struct {
LogLvl string `default:"info"`
LogStd bool `default:"false"`
Config string `default:"./heplify-server.toml"`
Version bool `default:"false"`
ConfigHTTPAddr string `default:""`
}

func NewConfig() *HeplifyServer {
Expand Down Expand Up @@ -102,7 +102,7 @@ func NewConfig() *HeplifyServer {
LogLvl: "info",
LogStd: false,
Config: "./heplify-server.toml",
Version: false,
ConfigHTTPAddr: "",
}
}

Expand Down
2 changes: 2 additions & 0 deletions database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/negbie/heplify-server/config"
"github.com/negbie/heplify-server/decoder"
"github.com/negbie/logp"
)

type Database struct {
Expand Down Expand Up @@ -67,6 +68,7 @@ func (d *Database) Run() error {

func (d *Database) End() {
close(d.Chan)
logp.Info("close %s channel", config.Setting.DBDriver)
}

func connectString(dbName string) (string, error) {
Expand Down
8 changes: 2 additions & 6 deletions database/mariafiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ var insconfmaria = []string{
`INSERT INTO alias (id, gid, ip, port, capture_id, alias, status, created) VALUES
(1, 10, '192.168.0.30', 0, 'homer01', 'proxy01', 1, '2014-06-12 20:36:50');`,

`INSERT INTO group (gid, name) VALUES (10, 'Administrator');`,
"INSERT INTO `group` (`gid`, `name`) VALUES (10, 'Administrator');",

`INSERT INTO node (id, host, dbname, dbport, dbusername, dbpassword, dbtables, name, status) VALUES
(1, '127.0.0.1', 'homer_data', '3306', 'homer_user', 'homer_password', 'sip_capture', 'homer01', 1);`,
Expand Down Expand Up @@ -70,11 +70,7 @@ var tblconfmaria = []string{
KEY host (ip)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;`,

`CREATE TABLE IF NOT EXISTS group (
gid int(10) NOT NULL DEFAULT 0,
name varchar(100) NOT NULL DEFAULT '',
UNIQUE KEY gid (gid)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;`,
"CREATE TABLE IF NOT EXISTS `group` (gid int(10) NOT NULL DEFAULT 0,name varchar(100) NOT NULL DEFAULT '',UNIQUE KEY gid (gid)) ENGINE=InnoDB DEFAULT CHARSET=latin1;",

`CREATE TABLE IF NOT EXISTS link_share (
id int(10) NOT NULL AUTO_INCREMENT,
Expand Down
5 changes: 3 additions & 2 deletions database/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (m *MySQL) setup() error {

if config.Setting.DBRotate {
r := NewRotator()
r.Rotate()
go r.Rotate()
}

if m.db, err = sql.Open(config.Setting.DBDriver, cs); err != nil {
Expand Down Expand Up @@ -210,7 +210,8 @@ func (m *MySQL) insert(hCh chan *decoder.HEP) {
select {
case pkt, ok = <-hCh:
if !ok {
break
m.db.Close()
return
}

if pkt.ProtoType == 1 && pkt.Payload != "" && pkt.SIP != nil {
Expand Down
5 changes: 3 additions & 2 deletions database/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (p *Postgres) setup() error {

if config.Setting.DBRotate {
r := NewRotator()
r.Rotate()
go r.Rotate()
}

if p.db, err = sql.Open(config.Setting.DBDriver, cs); err != nil {
Expand Down Expand Up @@ -96,7 +96,8 @@ func (p *Postgres) insert(hCh chan *decoder.HEP) {
select {
case pkt, ok := <-hCh:
if !ok {
break
p.db.Close()
return
}

date := pkt.Timestamp.Format(time.RFC3339Nano)
Expand Down
15 changes: 7 additions & 8 deletions database/rotator.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ func (r *Rotator) CreateDatabases() (err error) {
return nil
}

func replaceDay(d int) strings.Replacer {
func replaceDay(d int) *strings.Replacer {
pn := time.Now().Add(time.Hour * time.Duration(24*d)).Format("20060102")
return *strings.NewReplacer(
return strings.NewReplacer(
"{{date}}", pn,
)
}
Expand Down Expand Up @@ -205,7 +205,7 @@ func (r *Rotator) dbExec(db *sql.DB, query string) {
checkDBErr(err)
}

func (r *Rotator) dbExecFile(db *sql.DB, file []string, pattern strings.Replacer, d, p int) error {
func (r *Rotator) dbExecFile(db *sql.DB, file []string, pattern *strings.Replacer, d, p int) error {
t := time.Now().Add(time.Hour * time.Duration(24*d))
tt := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location())
newMinTime := tt.Format("1504")
Expand All @@ -226,7 +226,7 @@ func (r *Rotator) dbExecFile(db *sql.DB, file []string, pattern strings.Replacer
return lastErr
}

func (r *Rotator) dbExecFileLoop(db *sql.DB, file []string, pattern strings.Replacer, d, p int) {
func (r *Rotator) dbExecFileLoop(db *sql.DB, file []string, pattern *strings.Replacer, d, p int) {
for _, q := range file {
q = pattern.Replace(q)
fileLoop(db, q, d, p)
Expand Down Expand Up @@ -352,10 +352,9 @@ func setStep(name string) (step int) {

func checkDBErr(err error) {
if err != nil {
if mErr, ok := err.(*mysql.MySQLError); ok && mErr.Number != 1050 &&
mErr.Number != 1062 && mErr.Number != 1481 && mErr.Number != 1517 {
logp.Warn("%s\n\n", err)

if mErr, ok := err.(*mysql.MySQLError); ok && (mErr.Number == 1050 ||
mErr.Number == 1062 || mErr.Number == 1481 || mErr.Number == 1517) {
logp.Debug("rotator", "%s\n\n", err)
} else {
logp.Warn("%s\n\n", err)
}
Expand Down
2 changes: 2 additions & 0 deletions metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"runtime"

"github.com/negbie/heplify-server/decoder"
"github.com/negbie/logp"
)

type Metric struct {
Expand Down Expand Up @@ -42,4 +43,5 @@ func (m *Metric) Run() error {

func (m *Metric) End() {
close(m.Chan)
logp.Info("close metric channel")
}
9 changes: 0 additions & 9 deletions metric/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package metric

import (
"fmt"
"net/http"
"os"
"os/signal"
"strings"
Expand All @@ -15,7 +14,6 @@ import (
"github.com/negbie/heplify-server/config"
"github.com/negbie/heplify-server/decoder"
"github.com/negbie/logp"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

type Prometheus struct {
Expand Down Expand Up @@ -68,13 +66,6 @@ func (p *Prometheus) setup() (err error) {
return err
}

go func() {
http.Handle("/metrics", promhttp.Handler())
err = http.ListenAndServe(config.Setting.PromAddr, nil)
if err != nil {
logp.Err("%v", err)
}
}()
return err
}

Expand Down
16 changes: 2 additions & 14 deletions queue/nsq.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,9 @@ func (n *NSQ) setup() error {
}

func (n *NSQ) add(topic string, qCh chan []byte) {
var (
msg []byte
err error
ok bool
)

logp.Info("Run NSQ Output, server: %s, topic: %s\n", config.Setting.MQAddr, topic)

for {
msg, ok = <-qCh
if !ok {
break
}

err = n.producer.Publish(topic, msg)
for msg := range qCh {
err := n.producer.Publish(topic, msg)
if err != nil {
logp.Err("%v", err)
}
Expand Down
Loading

0 comments on commit f60f1be

Please sign in to comment.