Skip to content

Commit

Permalink
Add support for Node addresses
Browse files Browse the repository at this point in the history
Adds Validate() function which checks Endpoint IPs and Node Addresses
for dispatcher set membership authorization.
  • Loading branch information
Ulexus committed Jan 21, 2020
1 parent 35068f8 commit 41e75a9
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 26 deletions.
7 changes: 5 additions & 2 deletions endpoints/endpoints.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
// Package endpoints provides kubernetes Endpoint IP retrieval
//
// Deprecated: this trivial package is left due to potential external API
// needs. It is unused by the dispatchers package in favour of the internal
// version of this package which supplies additional structure.
package endpoints

import (
Expand All @@ -13,7 +18,6 @@ import (
// namespace. If the namespace is empty, the `default` namespace
// will be used.
func Get(ctx context.Context, c *k8s.Client, epNamespace, epName string) ([]string, error) {

ep := new(corev1.Endpoints)
if err := c.Get(ctx, epNamespace, epName, ep); err != nil {
return nil, errors.Wrap(err, "failed to list endpoints")
Expand All @@ -33,7 +37,6 @@ func Get(ctx context.Context, c *k8s.Client, epNamespace, epName string) ([]stri
// when a change occurs. If an error occurs, the error will be sent down the
// channel and the watch will terminate.
func Watch(ctx context.Context, c *k8s.Client, changes chan error, namespace string) error {

epList := new(corev1.Endpoints)
w, err := c.Watch(ctx, namespace, epList)
if err != nil {
Expand Down
7 changes: 3 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ require (
github.com/CyCoreSystems/go-kamailio v0.2.0
github.com/ericchiang/k8s v1.1.0
github.com/ghodss/yaml v1.0.0
github.com/golang/protobuf v1.1.0 // indirect
github.com/pkg/errors v0.8.0
golang.org/x/net v0.0.0-20180502164142-640f4622ab69 // indirect
github.com/golang/protobuf v1.2.0 // indirect
github.com/pkg/errors v0.8.1
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 // indirect
golang.org/x/sync v0.0.0-20190423024810-112230192c58 // indirect
golang.org/x/text v0.3.0 // indirect
gopkg.in/yaml.v2 v2.2.1 // indirect
)
14 changes: 8 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ github.com/ericchiang/k8s v1.1.0 h1:XjBbrZhlvos0PtQrvvSIPAeinnrYM4c/QKB0CWfnoJU=
github.com/ericchiang/k8s v1.1.0/go.mod h1:/OmBgSq2cd9IANnsGHGlEz27nwMZV2YxlpXuQtU3Bz4=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/golang/protobuf v1.1.0 h1:0iH4Ffd/meGoXqF2lSAhZHt8X+cPgkfn/cb6Cce5Vpc=
github.com/golang/protobuf v1.1.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
golang.org/x/net v0.0.0-20180502164142-640f4622ab69 h1:+Ybm3UzSfPpp+Hlr62ZTCtbC9DmCKX61f0r74+peGts=
golang.org/x/net v0.0.0-20180502164142-640f4622ab69/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
Expand Down
78 changes: 78 additions & 0 deletions internal/endpoints/endpoints.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Package endpoints provides kubernetes Endpoint IP retrieval
package endpoints

import (
"context"
"log"

"github.com/ericchiang/k8s"
corev1 "github.com/ericchiang/k8s/apis/core/v1"

"github.com/pkg/errors"
)

// Endpoints describes the set of Endpoints for a Service as well as the addresses of the Nodes on which those Endpoints run
type Endpoints struct {

// Addresses is the list of IP addresses for the Endpoints
Addresses []string

// NodeAddresses is the list of Node addresses on which the Endpoints run
NodeAddresses []string
}

// Get retrieves the IP addresses for a named endpoint in a given
// namespace. If the namespace is empty, the `default` namespace
// will be used.
func Get(ctx context.Context, c *k8s.Client, epNamespace, epName string) (ret Endpoints, err error) {
nodes := make(map[string]*corev1.Node)

ep := new(corev1.Endpoints)
if err := c.Get(ctx, epNamespace, epName, ep); err != nil {
return ret, errors.Wrap(err, "failed to list endpoints")
}

for _, s := range ep.GetSubsets() {
for _, a := range s.GetAddresses() {
ret.Addresses = append(ret.Addresses, a.GetIp())

epNode := new(corev1.Node)
if err = c.Get(ctx, epNamespace, a.GetNodeName(), epNode); err != nil {
log.Printf("WARNING: failed to get node %s for endpoint %s: %v", a.GetNodeName(), ep.String(), err)
}
}
}

for _, n := range nodes {
for _, a := range n.GetStatus().GetAddresses() {
ret.NodeAddresses = append(ret.NodeAddresses, a.GetAddress())
}
}

return
}

// Watch watches a namespace and returns a nil error on the provided channel
// when a change occurs. If an error occurs, the error will be sent down the
// channel and the watch will terminate.
func Watch(ctx context.Context, c *k8s.Client, changes chan error, namespace string) error {
epList := new(corev1.Endpoints)
w, err := c.Watch(ctx, namespace, epList)
if err != nil {
changes <- errors.Wrap(err, "failed to watch namespace")
return errors.Wrap(err, "failed to watch namespace")
}
defer w.Close() // nolint: errcheck

for ctx.Err() == nil {
ep := new(corev1.Endpoints)
_, err := w.Next(ep)
if err != nil {
changes <- errors.Wrap(err, "failure during watch")
return errors.Wrap(err, "failure during watch")
}
changes <- nil
}

return ctx.Err()
}
7 changes: 1 addition & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,7 @@ func (s *dispatcherSets) validateSetMember(id int, addr string) bool {
if !ok {
return false
}
for _, ref := range selectedSet.Hosts() {
if ref == addr {
return true
}
}
return false
return selectedSet.Validate(addr)
}

// notify signals to kamailio to reload its dispatcher list
Expand Down
42 changes: 34 additions & 8 deletions sets/sets.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"strings"
"time"

"github.com/CyCoreSystems/dispatchers/endpoints"
"github.com/CyCoreSystems/dispatchers/internal/endpoints"
"github.com/ericchiang/k8s"
"github.com/pkg/errors"
)
Expand All @@ -31,6 +31,9 @@ type DispatcherSet interface {
// Update causes the dispatcher set to be updated
Update(context.Context) (changed bool, err error)

// Validate checks an address for membership in the set
Validate(a string) bool

// Watch waits for the dispatcher set to change, returning the new value when that change occurs.
Watch(context.Context) (string, error)
}
Expand All @@ -51,6 +54,15 @@ func (s *staticSet) Hosts() []string {
return s.Members
}

func (s *staticSet) Validate(a string) bool {
for _, m := range s.Members {
if m == a {
return true
}
}
return false
}

func (s *staticSet) Export() string {
ret := fmt.Sprintf("# Dispatcher set %d\n", s.id)

Expand Down Expand Up @@ -97,6 +109,9 @@ type kubernetesSet struct {
// members is the list of members of this set
members []string

// nodeAddresses is the list of addresses belonging to the nodes on whic the members are running
nodeAddresses []string

// endpointName is the name of the Kubernetes Endpoint List
// from which the dispatcher endpoints should be derived.
endpointName string
Expand Down Expand Up @@ -167,19 +182,34 @@ func (s *kubernetesSet) Export() string {

// Update updates the list of proxies
func (s *kubernetesSet) Update(ctx context.Context) (changed bool, err error) {
list, err := s.getEndpoints(ctx)
eps, err := endpoints.Get(ctx, s.kc, s.endpointNamespace, s.endpointName)
if err != nil {
return
}

if differ(s.members, list) {
if differ(s.members, eps.Addresses) {
changed = true
}
s.members = list
s.members = eps.Addresses
s.nodeAddresses = eps.NodeAddresses

return
}

func (s *kubernetesSet) Validate(a string) bool {
for _, m := range s.members {
if a == m {
return true
}
}
for _, m := range s.nodeAddresses {
if a == m {
return true
}
}
return false
}

func (s *kubernetesSet) Watch(ctx context.Context) (string, error) {
for ctx.Err() == nil {
select {
Expand All @@ -202,10 +232,6 @@ func (s *kubernetesSet) Watch(ctx context.Context) (string, error) {
return s.Export(), ctx.Err()
}

func (s *kubernetesSet) getEndpoints(ctx context.Context) ([]string, error) {
return endpoints.Get(ctx, s.kc, s.endpointNamespace, s.endpointName)
}

func (s *kubernetesSet) maintainWatch(ctx context.Context) {
for ctx.Err() == nil {

Expand Down

0 comments on commit 41e75a9

Please sign in to comment.