Skip to content

Commit

Permalink
feat: add support for legacy Endpoints sets
Browse files Browse the repository at this point in the history
Signed-off-by: Seán C McCord <ulexus@gmail.com>
  • Loading branch information
Ulexus committed Jun 13, 2021
1 parent f490822 commit d95f8c4
Showing 1 changed file with 185 additions and 3 deletions.
188 changes: 185 additions & 3 deletions sets/sets.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"

"inet.af/netaddr"
v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -138,15 +139,15 @@ func NewKubernetesSet(ctx context.Context, f informers.SharedInformerFactory, se
port = "5060"
}

informer := f.Discovery().V1().EndpointSlices()

s := &kubernetesSet{
id: setID,
namespace: namespace,
name: name,
port: port,
}

informer := f.Discovery().V1().EndpointSlices()

informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: s.addFunc,
UpdateFunc: s.updateFunc,
Expand Down Expand Up @@ -217,7 +218,7 @@ func (s *kubernetesSet) Close() {}

func (s *kubernetesSet) State() *State {
return &State{
ID: s.id,
ID: s.id,
Endpoints: s.endpoints,
}
}
Expand Down Expand Up @@ -245,6 +246,187 @@ func (s *kubernetesSet) IsMember(addr string, port uint32) bool {
return false
}

// legacyKubernetesSet represents a dispatcher set whose
// data should be derived from Kubernetes.
type legacyKubernetesSet struct {
// id is the dispatch set index for this set
id int

endpoints []*Endpoint

// callbacks is the set of functions which should be called when the endpoint membership changes.
callbacks []func(*State)

// name is the name of the Kubernetes Endpoint List
// from which the dispatcher endpoints should be derived.
name string

// namespace is the namespace in which the Endpoint
// should be found.
namespace string

port string

mu sync.Mutex
}

// NewLegacyKunbernetesSet returns a new Kubernetes-based dispatcher set using the older Endpoints method.
// If the server is running an older version of Kubernetes which does not support v1.EndpointSlices (<v1.21), use this Kubernetes set.
//
// * `setID` is the dispatcher set's id
//
// * `namespace` is the namespace of the Service whose endpoints will describe this dispatcher set.
//
// * `name` is the name of the Service whose endpoints will describe this dispatcher set.
//
// * `port` is the port reference of the SIP endpoints this set describes. This is optional, and if not specified, will default to "5060".
//
func NewLegacyKubernetesSet(ctx context.Context, f informers.SharedInformerFactory, setID int, namespace, name, port string) (DispatcherSet, error) {
if port == "" {
port = "5060"
}

s := &legacyKubernetesSet{
id: setID,
namespace: namespace,
name: name,
port: port,
}

informer := f.Core().V1().Endpoints()

informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: s.addFunc,
UpdateFunc: s.updateFunc,
DeleteFunc: s.deleteFunc,
})

go informer.Informer().Run(ctx.Done())

return s, nil
}

func (s *legacyKubernetesSet) updateSet(obj interface{}) {
epList, ok := obj.(*v1.Endpoints)
if !ok {
return
}

if epList.Namespace != s.namespace {
return
}

if epList.Name != s.name {
return
}

list, err := flattenEndpoints(s.port, epList)
if err != nil {
return
}

s.mu.Lock()
if !isChanged(s.endpoints, list) {
s.mu.Unlock()
return
}

s.endpoints = list
s.mu.Unlock()

state := &State{
ID: s.id,
Endpoints: list,
}

for _, f := range s.callbacks {
f(state)
}
}

func (s *legacyKubernetesSet) addFunc(obj interface{}) {
s.updateSet(obj)
}

func (s *legacyKubernetesSet) updateFunc(old interface{}, obj interface{}) {
s.updateSet(obj)
}

func (s *legacyKubernetesSet) deleteFunc(obj interface{}) {
s.updateSet(obj)
}

func (s *legacyKubernetesSet) Close() {}

func (s *legacyKubernetesSet) State() *State {
return &State{
ID: s.id,
Endpoints: s.endpoints,
}
}

func (s *legacyKubernetesSet) RegisterChangeFunc(f func(*State)) {
s.mu.Lock()

s.callbacks = append(s.callbacks, f)

defer s.mu.Unlock()
}

func (s *legacyKubernetesSet) IsMember(addr string, port uint32) bool {
for _, ep := range s.endpoints {
if ep.Address == addr {

if port > 0 {
if ep.Port != port {
return false
}
}
return true
}
}
return false
}

func flattenEndpoints(refPort string, epList *v1.Endpoints) (out []*Endpoint, err error) {
parsedPortNumber, err := strconv.Atoi(refPort)
if err != nil {
parsedPortNumber = 0
}

portNumber := uint32(parsedPortNumber)

for _, ss := range epList.Subsets {

if portNumber == 0 {
for _, p := range ss.Ports {
if p.Name == "" {
continue
}

if p.Name == refPort {
if p.Port > 0 {
portNumber = uint32(p.Port)
}
}
}

if portNumber == 0 {
return nil, fmt.Errorf("failed to find port %s in Endpoints %q", refPort, epList.Name)
}
}

for _, addr := range ss.Addresses {
out = append(out, &Endpoint{
Address: addr.IP,
Port: portNumber,
})
}
}

return out, nil
}

func flattenEndpointSlice(refPort string, epSlice *discoveryv1.EndpointSlice) (out []*Endpoint, err error) {
portNumber, err := strconv.Atoi(refPort)
if err != nil {
Expand Down

0 comments on commit d95f8c4

Please sign in to comment.