Skip to content

Commit

Permalink
discovery refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
ixre committed Dec 15, 2020
1 parent e56b1fc commit 35c0cd0
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 11 deletions.
16 changes: 9 additions & 7 deletions core/etcd/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ import (
"hash/crc32"
"log"
"net"
"os"
"strings"
"time"
)

var prefix = "/registry/server/"

type Registry interface {
// 创建租期/注册节点,返回租期ID和错误
Register(port int) (int64, error)
// 创建租期/注册节点,返回租期ID和错误, 如果IP为空,则默认为第一个网卡首个IP
Register(ip string,port int) (int64, error)
// 撤销租期/注销节点
Revoke(LeaseID int64) error
UnRegister()
Expand Down Expand Up @@ -60,8 +60,7 @@ func NewRegistry(service string, ttl int64, config clientv3.Config) (Registry, e
func (s *registryServer) resolveIp() string {
addrList, err := net.InterfaceAddrs()
if err != nil {
fmt.Println(err)
os.Exit(1)
log.Fatalln(err.Error())
}
for _, address := range addrList {
// 检查ip地址判断是否回环地址
Expand All @@ -73,13 +72,16 @@ func (s *registryServer) resolveIp() string {
}
return "127.0.0.1"
}
func (s *registryServer) Register(port int) (leaseId int64, err error) {
func (s *registryServer) Register(ip string,port int) (leaseId int64, err error) {
if s.isRegistry {
panic("only one nodes can be registered")
}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(s.ttl)*time.Second)
defer cancel()
addr := fmt.Sprintf("%s:%d", s.resolveIp(), port)
if len(strings.TrimSpace(ip)) == 0 {
ip = s.resolveIp()
}
addr := fmt.Sprintf("%s:%d", ip, port)
// 创建租约
grant, err := s.cli.Grant(context.Background(), s.ttl)
if err != nil {
Expand Down
24 changes: 23 additions & 1 deletion core/service/discovery.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package service

import (
"fmt"
"github.com/ixre/gof/log"
"go.etcd.io/etcd/clientv3"
"go2o/core/etcd"
"net"
)

/**
Expand All @@ -24,8 +27,27 @@ func registerServiceDiscovery(cfg *clientv3.Config, port int) {
if err != nil {
panic(err)
}
_, err = r.Register(port)
ip := resolveIp()
_, err = r.Register(ip,port)
if err != nil {
panic(err)
}
log.Println(fmt.Sprintf("[ Go2o][ RPC]: server discovery register success. node: %s:%d",ip,port))
}


func resolveIp() string {
addrList, err := net.InterfaceAddrs()
if err != nil {
log.Fatalln(err.Error())
}
for _, address := range addrList {
// 检查ip地址判断是否回环地址
if i, ok := address.(*net.IPNet); ok && !i.IP.IsLoopback() {
if i.IP.To4() != nil {
return i.IP.String()
}
}
}
return "127.0.0.1"
}
7 changes: 5 additions & 2 deletions core/service/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,15 @@ func ServeRPC(ch chan bool, cfg *clientv3.Config, port int) {
proto.RegisterAppServiceServer(s, grpc2.AppService)
proto.RegisterRbacServiceServer(s, grpc2.RbacService)
registerServiceDiscovery(cfg, port)
log.Println("[ Go2o][ RPC]: server discovery register success")
go serveRPC(ch,s, port)
}

func serveRPC(ch chan bool, s *grpc.Server, port int) {
l, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
panic(err)
}
log.Println("[ Go2o][ API]: grpc node serve on port :"+strconv.Itoa(port))
log.Println("[ Go2o][ API]: grpc node serve on port :" + strconv.Itoa(port))
if err = s.Serve(l); err != nil {
ch <- false
panic(err)
Expand Down
2 changes: 1 addition & 1 deletion go2o-serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func main() {
// 初始化producer
_ = msq.Configure(msq.NATS, strings.Split(mqAddr, ","))
// 运行RPC服务
go service.ServeRPC(ch, &cfg, port)
service.ServeRPC(ch, &cfg, port)
service.ConfigureClient(cfg) // initial service client
if runDaemon {
//todo: daemon需重构
Expand Down

0 comments on commit 35c0cd0

Please sign in to comment.