init
This commit is contained in:
104
client/client.go
Normal file
104
client/client.go
Normal file
@@ -0,0 +1,104 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"git.kumo.work/shama/service/config"
|
||||
consulClient "git.kumo.work/shama/service/lib/consul"
|
||||
"github.com/smallnest/rpcx/client"
|
||||
"log"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
var mClient = sync.Map{}
|
||||
var mutex = sync.Mutex{}
|
||||
var basePkgPath string
|
||||
|
||||
type empty int
|
||||
|
||||
func init() {
|
||||
pkgPath := reflect.TypeOf(empty(0)).PkgPath()
|
||||
basePkgPath = pkgPath[:len(pkgPath)-6]
|
||||
}
|
||||
|
||||
type RpcClient struct {
|
||||
baseName string
|
||||
serviceName string
|
||||
client client.XClient
|
||||
}
|
||||
|
||||
// Call @Title 调用接口
|
||||
func (r RpcClient) Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) (err error) {
|
||||
if config.RpcConfig.BeforeHandel != nil {
|
||||
ctx, err = config.RpcConfig.BeforeHandel(ctx, r.baseName, r.serviceName, serviceMethod, args, reply)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// 调用请求
|
||||
err = r.client.Call(ctx, serviceMethod, args, reply)
|
||||
if config.RpcConfig.AfterHandel != nil {
|
||||
err = config.RpcConfig.AfterHandel(ctx, r.baseName, r.serviceName, serviceMethod, args, reply, err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// GetClient @Title 获取RPC客户的
|
||||
func GetClient(s interface{}) (*RpcClient, error) {
|
||||
path := strings.TrimPrefix(reflect.ValueOf(s).Elem().Type().PkgPath(), basePkgPath)
|
||||
actionName := reflect.ValueOf(s).Elem().Type().Name()
|
||||
key := path + "/" + actionName
|
||||
split := strings.SplitN(key, "/", 2)
|
||||
basePath := split[0]
|
||||
servicePath := split[1]
|
||||
xClient, ok := mClient.Load(key)
|
||||
if !ok {
|
||||
mutex.Lock()
|
||||
xClient, ok = mClient.Load(key)
|
||||
if !ok {
|
||||
d, err := consulClient.NewConsulDiscovery(basePath, servicePath, config.RpcConfig.RegistryServer, nil)
|
||||
if err != nil {
|
||||
return nil, errors.New("系统异常")
|
||||
}
|
||||
option := client.DefaultOption
|
||||
option.Retries = 3
|
||||
option.GenBreaker = func() client.Breaker {
|
||||
return client.NewConsecCircuitBreaker(2, 30*time.Second)
|
||||
}
|
||||
xClient = client.NewXClient(servicePath, client.Failover, client.RoundRobin, d, option)
|
||||
mClient.Store(key, xClient)
|
||||
}
|
||||
mutex.Unlock()
|
||||
}
|
||||
return &RpcClient{client: xClient.(client.XClient), baseName: basePath, serviceName: servicePath}, nil
|
||||
}
|
||||
|
||||
// GetClientName @Title 根据服务名获取客户端
|
||||
func GetClientName(base, service string) (*RpcClient, error) {
|
||||
key := fmt.Sprintf("%s/%s", base, service)
|
||||
xClient, ok := mClient.Load(key)
|
||||
if !ok {
|
||||
mutex.Lock()
|
||||
xClient, ok = mClient.Load(key)
|
||||
if !ok {
|
||||
d, err := consulClient.NewConsulDiscovery(base, service, config.RpcConfig.RegistryServer, nil)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return nil, errors.New("系统异常")
|
||||
}
|
||||
option := client.DefaultOption
|
||||
option.Retries = 3
|
||||
option.GenBreaker = func() client.Breaker {
|
||||
return client.NewConsecCircuitBreaker(2, 30*time.Second)
|
||||
}
|
||||
xClient = client.NewXClient(service, client.Failover, client.RoundRobin, d, option)
|
||||
mClient.Store(key, xClient)
|
||||
}
|
||||
mutex.Unlock()
|
||||
}
|
||||
return &RpcClient{client: xClient.(client.XClient), baseName: base, serviceName: service}, nil
|
||||
}
|
||||
Reference in New Issue
Block a user