Skip to content
This repository has been archived by the owner on Dec 20, 2024. It is now read-only.

Commit

Permalink
feature:add api /peer/dynamicrate
Browse files Browse the repository at this point in the history
Signed-off-by: zhaxzhax <[email protected]>
  • Loading branch information
zhaxzhax committed Aug 7, 2020
1 parent 3e616d5 commit d890733
Show file tree
Hide file tree
Showing 14 changed files with 148 additions and 8 deletions.
34 changes: 34 additions & 0 deletions apis/swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,40 @@ paths:
500:
$ref: "#/responses/500ErrorResponse"

/peer/dynamicrate:
get:
summary: "report the newest totalLimit to supernode"
produces:
- "application/json"
parameters:
- name: taskId
in: query
required: true
description: "ID of task"
type: string
- name: cid
in: query
type: string
required: true
description: |
the downloader clientID
- name: dynamicRate
in: query
type: integer
format: int64
required: true
description: |
the newest totalLimit of dfget server
responses:
200:
description: "no error"
schema:
$ref: "#/definitions/ResultInfo"
404:
$ref: "#/responses/404ErrorResponse"
500:
$ref: "#/responses/500ErrorResponse"

/api/v1/peers:
post:
summary: "register dfget in Supernode as a peer node"
Expand Down
1 change: 1 addition & 0 deletions client/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type PeerAPIClient interface {
PeerDelete(ctx context.Context, id string) error
PeerInfo(ctx context.Context, id string) (peerInfoResponse *types.PeerInfo, err error)
PeerList(ctx context.Context, id string) (peersInfoResponse []*types.PeerInfo, err error)
PeerDynamicRate(ctx context.Context, id string, dynamicRate int64) error
}

// TaskAPIClient defines methods of task related client.
Expand Down
20 changes: 20 additions & 0 deletions dfget/core/api/supernode_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
metricsReportPath = "/task/metrics"
fetchP2PNetworkPath = "/peer/network"
peerHeartBeatPath = "/peer/heartbeat"
peerDynamicRatePath = "/peer/dynamicrate"
)

// NewSupernodeAPI creates a new instance of SupernodeAPI with default value.
Expand All @@ -64,6 +65,7 @@ type SupernodeAPI interface {
ReportResource(node string, req *types.RegisterRequest) (resp *types.RegisterResponse, err error)
ApplyForSeedNode(node string, req *types.RegisterRequest) (resp *types.RegisterResponse, err error)
ReportResourceDeleted(node string, taskID string, cid string) (resp *types.BaseResponse, err error)
ReportDynamicRate(node string, taskID string, cid string, dynamicRate int64) (resp *types.BaseResponse, err error)
}

type supernodeAPI struct {
Expand Down Expand Up @@ -353,3 +355,21 @@ func (api *supernodeAPI) HeartBeat(node string, req *api_types.HeartBeatRequest)
}
return resp, err
}

// ServiceDown reports the status of the local peer to supernode.
func (api *supernodeAPI) ReportDynamicRate(node string, taskID string, cid string, dynamicRate int64) (
resp *types.BaseResponse, err error) {

url := fmt.Sprintf("%s://%s%s?taskId=%s&cid=%s&dynamicRate=%d",
api.Scheme, node, peerDynamicRatePath, taskID, cid, dynamicRate)

resp = new(types.BaseResponse)
if err = api.get(url, resp); err != nil {
logrus.Errorf("failed to send dynamicRate,err: %v", err)
return nil, err
}
if resp.Code != constants.CodeUpdateDynamicRate {
logrus.Errorf("failed to send dynamicRate to supernode: api response code is %d not equal to %d", resp.Code, constants.CodeUpdateDynamicRate)
}
return
}
7 changes: 7 additions & 0 deletions dfget/core/api/supernode_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ func (s *SupernodeAPITestSuite) TestSupernodeAPI_ReportPiece(c *check.C) {
c.Check(r.Code, check.Equals, 611)
}

func (s *SupernodeAPITestSuite) TestSupernodeAPI_ReportDynamicRate(c *check.C) {
s.mock.GetFunc = s.mock.CreateGetFunc(200, []byte(`{"Code":200}`), nil)
r, e := s.api.ReportDynamicRate(localhost, "", "", 10000000)
c.Check(e, check.IsNil)
c.Check(r.Code, check.Equals, 200)
}

func (s *SupernodeAPITestSuite) TestSupernodeAPI_ServiceDown(c *check.C) {
s.mock.GetFunc = s.mock.CreateGetFunc(200, []byte(`{"Code":200}`), nil)
r, e := s.api.ServiceDown(localhost, "", "")
Expand Down
13 changes: 13 additions & 0 deletions dfget/core/helper/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ type ReportFuncType func(ip string, req *types.ReportPieceRequest) (*types.BaseR
// ServiceDownFuncType function type of SupernodeAPI#ServiceDown
type ServiceDownFuncType func(ip string, taskID string, cid string) (*types.BaseResponse, error)

// ReportDynamicRateFuncType function type of SupernodeAPI#ReportDynamicRate
type ReportDynamicRateFuncType func(ip string, taskID string, cid string, dynamicRate int64) (*types.BaseResponse, error)

// ClientErrorFuncType function type of SupernodeAPI#ReportClientError
type ClientErrorFuncType func(ip string, req *types.ClientErrorRequest) (*types.BaseResponse, error)

Expand All @@ -129,6 +132,7 @@ type MockSupernodeAPI struct {
ServiceDownFunc ServiceDownFuncType
ClientErrorFunc ClientErrorFuncType
ReportMetricsFunc ReportMetricsFuncType
ReportDynamicRateFunc ReportDynamicRateFuncType
}

var _ api.SupernodeAPI = &MockSupernodeAPI{}
Expand Down Expand Up @@ -169,6 +173,15 @@ func (m *MockSupernodeAPI) ServiceDown(ip string, taskID string, cid string) (
return nil, nil
}

// ReportDynamicRate implements SupernodeAPI#ReportDynamicRate.
func (m *MockSupernodeAPI) ReportDynamicRate(ip string, taskID string, cid string, dynamicRate int64) (
*types.BaseResponse, error) {
if m.ReportDynamicRateFunc != nil {
return m.ReportDynamicRateFunc(ip, taskID, cid, dynamicRate)
}
return nil, nil
}

// ReportClientError implements SupernodeAPI#ReportClientError.
func (m *MockSupernodeAPI) ReportClientError(ip string, req *types.ClientErrorRequest) (resp *types.BaseResponse, e error) {
if m.ClientErrorFunc != nil {
Expand Down
21 changes: 20 additions & 1 deletion dfget/core/uploader/peer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ func newPeerServer(cfg *config.Config, port int) *peerServer {
if err != nil{
logrus.Errorf("update dynamic bandwidth fail:%s",err)
}
// break the goroutine when server is down
if s.finished == nil{
break
}
}
}()
}
Expand Down Expand Up @@ -234,6 +238,8 @@ func (ps *peerServer) checkHandler(w http.ResponseWriter, r *http.Request) {
atomic.StoreInt64(ps.totalLimitRate,int64(totalLimit))
logrus.Infof("update total limit to %d", totalLimit)
}
//update rate in supernode
ps.updateDynamicRateToSupernode(int64(totalLimit))

// get parameters
taskFileName := mux.Vars(r)["commonFile"]
Expand Down Expand Up @@ -301,11 +307,23 @@ func (ps *peerServer) updateDynamicRate()(error){
return err
}
atomic.StoreInt64(ps.totalLimitRate,dynamicRate)

ps.updateDynamicRateToSupernode(dynamicRate)
logrus.Infof("update total limit to %d", dynamicRate)
return nil
}

func (ps *peerServer) updateDynamicRateToSupernode(rate int64){
ps.syncTaskMap.Range(func(key, value interface{}) bool {
task, ok := value.(*taskConfig)
if ok {
ps.api.ReportDynamicRate(task.superNode, task.taskID, task.cid, rate)
logrus.Infof("report task %s dynamicRate change to %d",
task.taskID, rate)
}
return true
})
}

// getTaskFile finds the file and returns the File object.
func (ps *peerServer) getTaskFile(taskFileName string) (*os.File, int64, error) {
errSize := int64(-1)
Expand Down Expand Up @@ -419,6 +437,7 @@ func (ps *peerServer) uploadPiece(f *os.File, w http.ResponseWriter, up *uploadP
f.Seek(up.start, 0)
r := io.LimitReader(f, readLen)
if ps.rateLimiter != nil {
ps.rateLimiter.SetRate(ratelimiter.TransRate(atomic.LoadInt64(ps.totalLimitRate)))
lr := limitreader.NewLimitReaderWithLimiter(ps.rateLimiter, r, false)
_, e = io.CopyBuffer(w, lr, buf)
} else {
Expand Down
3 changes: 1 addition & 2 deletions dfget/core/uploader/peer_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ func (s *PeerServerTestSuite) TearDownSuite(c *check.C) {
}
}

// Todo: this test-func may cause data race, have no idea to write a no data race unit test
func (s *PeerServerTestSuite) TestGetDynamicRate(c *check.C){
func (s *PeerServerTestSuite) TestUpdateDynamicRate(c *check.C){
cfg := createConfig(s.workHome, 0)
cfg.Dynamic = true
ps := newPeerServer(cfg, 0)
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/go-openapi/swag v0.19.9
github.com/go-openapi/validate v0.19.10
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef
github.com/golang/mock v1.4.3
github.com/golang/mock v1.4.4
github.com/gorilla/context v0.0.0-20181012153548-51ce91d2eadd // indirect
github.com/gorilla/mux v1.5.0
github.com/gorilla/schema v1.1.0
Expand All @@ -36,10 +36,11 @@ require (
github.com/willf/bitset v0.0.0-20190228212526-18bd95f470f9
go.mongodb.org/mongo-driver v1.3.5 // indirect
golang.org/x/net v0.0.0-20200707034311-ab3426394381 // indirect
golang.org/x/tools v0.0.0-20200724172932-b5fc9d354d99 // indirect
golang.org/x/tools v0.0.0-20200806234136-990129eca547 // indirect
gopkg.in/gcfg.v1 v1.2.3
gopkg.in/mgo.v2 v2.0.0-20160818020120-3f83fa500528 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0
gopkg.in/warnings.v0 v0.1.2
gopkg.in/yaml.v2 v2.3.0
rsc.io/quote/v3 v3.1.0 // indirect
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ github.com/golang/mock v1.4.0 h1:Rd1kQnQu0Hq3qvJppYSG0HtP+f5LPPUiDswTLiEegLg=
github.com/golang/mock v1.4.0/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
github.com/golang/mock v1.4.3 h1:GV+pQPG/EUUbkh47niozDcADz6go/dUwhVzdUQHIVRw=
github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
github.com/golang/mock v1.4.4 h1:l75CXGRSwbaYNpl/Z2X1XIIAMSCquvXgpVZDhwEIJsc=
github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand Down Expand Up @@ -443,6 +445,8 @@ golang.org/x/tools v0.0.0-20200723000907-a7c6fd066f6d h1:7k9BKfwmdbykG6l5ztniTrH
golang.org/x/tools v0.0.0-20200723000907-a7c6fd066f6d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200724172932-b5fc9d354d99 h1:OHn441rq5CeM5r1xJ0OmY7lfdTvnedi6k+vQiI7G9b8=
golang.org/x/tools v0.0.0-20200724172932-b5fc9d354d99/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200806234136-990129eca547 h1:HjPd370wmFEFsBBdDEikoje9tDY3OHE7UB2erLJBmss=
golang.org/x/tools v0.0.0-20200806234136-990129eca547/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
Expand Down
1 change: 1 addition & 0 deletions pkg/constants/dfget_super_code.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ const (
CodeSourceError = 610
CodeGetPieceReport = 611
CodeGetPeerDown = 612
CodeUpdateDynamicRate = 613
)

/* the code of task result that dfget will report to supernode */
Expand Down
14 changes: 14 additions & 0 deletions supernode/daemon/mgr/mock/mock_progress_mgr.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions supernode/daemon/mgr/progress/progress_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/dragonflyoss/Dragonfly/pkg/errortypes"
"github.com/dragonflyoss/Dragonfly/supernode/config"
"github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/mock"

"github.com/go-check/check"
Expand All @@ -42,17 +43,17 @@ type ProgressManagerTestSuite struct {
manager *Manager
}

func (s *SchedulerMgrTestSuite) SetUpSuite(c *check.C) {
func (s *ProgressManagerTestSuite) SetUpSuite(c *check.C) {
s.mockCtl = gomock.NewController(c)
s.mockProgressMgr = mock.NewMockProgressMgr(s.mockCtl)
s.mockProgressMgr.EXPECT().GetPeerIDsByPieceNum(gomock.Any(), gomock.Any(), gomock.Any()).Return([]string{"peerID"}, nil).AnyTimes()

cfg := config.NewConfig()
cfg.SetSuperPID("fooPid")
s.manager, _ = NewManager(cfg, s.mockProgressMgr)
s.manager, _ = NewManager(cfg)
}

func (s *SchedulerMgrTestSuite) TearDownSuite(c *check.C) {
func (s *ProgressManagerTestSuite) TearDownSuite(c *check.C) {
s.mockCtl.Finish()
}

Expand Down
25 changes: 25 additions & 0 deletions supernode/server/0.3_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"net/http"
"strconv"

"github.com/go-openapi/strfmt"
"github.com/gorilla/schema"
Expand Down Expand Up @@ -322,3 +323,27 @@ func (s *Server) fetchP2PNetworkInfo(ctx context.Context, rw http.ResponseWriter
func (s *Server) reportPeerHealth(ctx context.Context, rw http.ResponseWriter, req *http.Request) error {
return EncodeResponse(rw, http.StatusOK, &types.HeartBeatResponse{})
}

func (s *Server) updateDynamicRate(ctx context.Context, rw http.ResponseWriter, req *http.Request) (err error) {
params := req.URL.Query()
taskID := params.Get("taskId")
cID := params.Get("cid")
strDynamicRate := params.Get("dynamicRate")
//convert dynamicRate from string to int64
dynamicRate, err := strconv.ParseInt(strDynamicRate, 10, 64)
if err!=nil {
return err
}
// get peerID according to the CID and taskID
dfgetTask, err := s.DfgetTaskMgr.Get(ctx, cID, taskID)
if err != nil {
return err
}
if err := s.ProgressMgr.UpdatePeerDynamicRate(ctx,dfgetTask.PeerID,dynamicRate);err !=nil{
return err
}

return EncodeResponse(rw, http.StatusOK, &types.ResultInfo{
Code: constants.CodeUpdateDynamicRate,
})
}
1 change: 1 addition & 0 deletions supernode/server/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func registerLegacy(s *Server) {
{Method: http.MethodGet, Path: "/peer/piece/error", HandlerFunc: s.reportPieceError},
{Method: http.MethodPost, Path: "/peer/network", HandlerFunc: s.fetchP2PNetworkInfo},
{Method: http.MethodPost, Path: "/peer/heartbeat", HandlerFunc: s.reportPeerHealth},
{Method: http.MethodGet, Path: "/peer/dynamicrate", HandlerFunc: s.updateDynamicRate},
}
api.Legacy.Register(legacyHandlers...)
api.Legacy.Register(preheatHandlers(s)...)
Expand Down

0 comments on commit d890733

Please sign in to comment.