245 lines
		
	
	
		
			5.5 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			245 lines
		
	
	
		
			5.5 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package consul
 | |
| 
 | |
| import (
 | |
| 	"path"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/rpcxio/libkv"
 | |
| 	"github.com/rpcxio/libkv/store"
 | |
| 	"github.com/rpcxio/libkv/store/consul"
 | |
| 	"github.com/smallnest/rpcx/client"
 | |
| 	"github.com/smallnest/rpcx/log"
 | |
| )
 | |
| 
 | |
| func init() {
 | |
| 	consul.Register()
 | |
| }
 | |
| 
 | |
| // ConsulDiscovery is a consul service discovery.
 | |
| // It always returns the registered servers in consul.
 | |
| type ConsulDiscovery struct {
 | |
| 	basePath string
 | |
| 	kv       store.Store
 | |
| 	pairsMu  sync.RWMutex
 | |
| 	pairs    []*client.KVPair
 | |
| 	chans    []chan []*client.KVPair
 | |
| 	mu       sync.Mutex
 | |
| 	// -1 means it always retry to watch until zookeeper is ok, 0 means no retry.
 | |
| 	RetriesAfterWatchFailed int
 | |
| 
 | |
| 	filter client.ServiceDiscoveryFilter
 | |
| 
 | |
| 	stopCh chan struct{}
 | |
| }
 | |
| 
 | |
| // NewConsulDiscovery returns a new ConsulDiscovery.
 | |
| func NewConsulDiscovery(basePath, servicePath string, consulAddr []string, options *store.Config) (*ConsulDiscovery, error) {
 | |
| 	kv, err := libkv.NewStore(store.CONSUL, consulAddr, options)
 | |
| 	if err != nil {
 | |
| 		log.Infof("cannot create store: %v", err)
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return NewConsulDiscoveryStore(basePath+"/"+servicePath, kv)
 | |
| }
 | |
| 
 | |
| // NewConsulDiscoveryStore returns a new ConsulDiscovery with specified store.
 | |
| func NewConsulDiscoveryStore(basePath string, kv store.Store) (*ConsulDiscovery, error) {
 | |
| 	if basePath[0] == '/' {
 | |
| 		basePath = basePath[1:]
 | |
| 	}
 | |
| 
 | |
| 	if len(basePath) > 1 && strings.HasSuffix(basePath, "/") {
 | |
| 		basePath = basePath[:len(basePath)-1]
 | |
| 	}
 | |
| 
 | |
| 	d := &ConsulDiscovery{basePath: basePath, kv: kv}
 | |
| 	d.stopCh = make(chan struct{})
 | |
| 
 | |
| 	ps, err := kv.List(basePath)
 | |
| 	if err != nil && err != store.ErrKeyNotFound {
 | |
| 		log.Infof("cannot get services of from registry: %v, err: %v", basePath, err)
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	pairs := make([]*client.KVPair, 0, len(ps))
 | |
| 	prefix := d.basePath + "/"
 | |
| 	for _, p := range ps {
 | |
| 		if path.Dir(p.Key) != d.basePath {
 | |
| 			continue
 | |
| 		}
 | |
| 		k := strings.TrimPrefix(p.Key, prefix)
 | |
| 		pair := &client.KVPair{Key: k, Value: string(p.Value)}
 | |
| 		if d.filter != nil && !d.filter(pair) {
 | |
| 			continue
 | |
| 		}
 | |
| 		pairs = append(pairs, pair)
 | |
| 	}
 | |
| 	d.pairsMu.Lock()
 | |
| 	d.pairs = pairs
 | |
| 	d.pairsMu.Unlock()
 | |
| 	d.RetriesAfterWatchFailed = -1
 | |
| 	go d.watch()
 | |
| 	return d, nil
 | |
| }
 | |
| 
 | |
| // NewConsulDiscoveryTemplate returns a new ConsulDiscovery template.
 | |
| func NewConsulDiscoveryTemplate(basePath string, consulAddr []string, options *store.Config) (*ConsulDiscovery, error) {
 | |
| 	if basePath[0] == '/' {
 | |
| 		basePath = basePath[1:]
 | |
| 	}
 | |
| 
 | |
| 	if len(basePath) > 1 && strings.HasSuffix(basePath, "/") {
 | |
| 		basePath = basePath[:len(basePath)-1]
 | |
| 	}
 | |
| 
 | |
| 	kv, err := libkv.NewStore(store.CONSUL, consulAddr, options)
 | |
| 	if err != nil {
 | |
| 		log.Infof("cannot create store: %v", err)
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return NewConsulDiscoveryStore(basePath, kv)
 | |
| }
 | |
| 
 | |
| // Clone clones this ServiceDiscovery with new servicePath.
 | |
| func (d *ConsulDiscovery) Clone(servicePath string) (client.ServiceDiscovery, error) {
 | |
| 	return NewConsulDiscoveryStore(d.basePath+"/"+servicePath, d.kv)
 | |
| }
 | |
| 
 | |
| // SetFilter sets the filer.
 | |
| func (d *ConsulDiscovery) SetFilter(filter client.ServiceDiscoveryFilter) {
 | |
| 	d.filter = filter
 | |
| }
 | |
| 
 | |
| // GetServices returns the servers
 | |
| func (d *ConsulDiscovery) GetServices() []*client.KVPair {
 | |
| 	d.pairsMu.RLock()
 | |
| 	defer d.pairsMu.RUnlock()
 | |
| 	return d.pairs
 | |
| }
 | |
| 
 | |
| // WatchService returns a nil chan.
 | |
| func (d *ConsulDiscovery) WatchService() chan []*client.KVPair {
 | |
| 	d.mu.Lock()
 | |
| 	defer d.mu.Unlock()
 | |
| 
 | |
| 	ch := make(chan []*client.KVPair, 10)
 | |
| 	d.chans = append(d.chans, ch)
 | |
| 	return ch
 | |
| }
 | |
| 
 | |
| func (d *ConsulDiscovery) RemoveWatcher(ch chan []*client.KVPair) {
 | |
| 	d.mu.Lock()
 | |
| 	defer d.mu.Unlock()
 | |
| 
 | |
| 	var chans []chan []*client.KVPair
 | |
| 	for _, c := range d.chans {
 | |
| 		if c == ch {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		chans = append(chans, c)
 | |
| 	}
 | |
| 
 | |
| 	d.chans = chans
 | |
| }
 | |
| 
 | |
| func (d *ConsulDiscovery) watch() {
 | |
| 	defer func() {
 | |
| 		d.kv.Close()
 | |
| 	}()
 | |
| 	for {
 | |
| 		var err error
 | |
| 		var c <-chan []*store.KVPair
 | |
| 		var tempDelay time.Duration
 | |
| 
 | |
| 		retry := d.RetriesAfterWatchFailed
 | |
| 		for d.RetriesAfterWatchFailed < 0 || retry >= 0 {
 | |
| 			c, err = d.kv.WatchTree(d.basePath, d.stopCh)
 | |
| 			if err != nil {
 | |
| 				if d.RetriesAfterWatchFailed > 0 {
 | |
| 					retry--
 | |
| 				}
 | |
| 				if tempDelay == 0 {
 | |
| 					tempDelay = 1 * time.Second
 | |
| 				} else {
 | |
| 					tempDelay *= 2
 | |
| 				}
 | |
| 				if max := 30 * time.Second; tempDelay > max {
 | |
| 					tempDelay = max
 | |
| 				}
 | |
| 				log.Warnf("can not watchtree (with retry %d, sleep %v): %s: %v", retry, tempDelay, d.basePath, err)
 | |
| 				time.Sleep(tempDelay)
 | |
| 				continue
 | |
| 			}
 | |
| 			break
 | |
| 		}
 | |
| 
 | |
| 		if err != nil {
 | |
| 			log.Errorf("can't watch %s: %v", d.basePath, err)
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		prefix := d.basePath + "/"
 | |
| 
 | |
| 	readChanges:
 | |
| 		for {
 | |
| 			select {
 | |
| 			case <-d.stopCh:
 | |
| 				log.Info("discovery has been closed")
 | |
| 				return
 | |
| 			case ps, ok := <-c:
 | |
| 				if !ok {
 | |
| 					break readChanges
 | |
| 				}
 | |
| 				var pairs []*client.KVPair // latest servers
 | |
| 				if ps == nil {
 | |
| 					d.pairsMu.Lock()
 | |
| 					d.pairs = pairs
 | |
| 					d.pairsMu.Unlock()
 | |
| 					continue
 | |
| 				}
 | |
| 				for _, p := range ps {
 | |
| 					if path.Dir(p.Key) != d.basePath {
 | |
| 						continue
 | |
| 					}
 | |
| 					k := strings.TrimPrefix(p.Key, prefix)
 | |
| 					pair := &client.KVPair{Key: k, Value: string(p.Value)}
 | |
| 					if d.filter != nil && !d.filter(pair) {
 | |
| 						continue
 | |
| 					}
 | |
| 					pairs = append(pairs, pair)
 | |
| 				}
 | |
| 				d.pairsMu.Lock()
 | |
| 				d.pairs = pairs
 | |
| 				d.pairsMu.Unlock()
 | |
| 
 | |
| 				d.mu.Lock()
 | |
| 				for _, ch := range d.chans {
 | |
| 					ch := ch
 | |
| 					go func() {
 | |
| 						defer func() {
 | |
| 							recover()
 | |
| 						}()
 | |
| 						select {
 | |
| 						case ch <- pairs:
 | |
| 						case <-time.After(time.Minute):
 | |
| 							log.Warn("chan is full and new change has been dropped")
 | |
| 						}
 | |
| 					}()
 | |
| 				}
 | |
| 				d.mu.Unlock()
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		log.Warn("chan is closed and will rewatch")
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (d *ConsulDiscovery) Close() {
 | |
| 	close(d.stopCh)
 | |
| }
 |