diff --git a/pkg/nftset_utils/handler.go b/pkg/nftset_utils/handler.go index 3b79afb42..a0ba41405 100644 --- a/pkg/nftset_utils/handler.go +++ b/pkg/nftset_utils/handler.go @@ -121,7 +121,7 @@ func (h *NftSetHandler) AddElems(es ...netip.Prefix) error { if set.Interval { start := e.Masked().Addr() elems = append(elems, nftables.SetElement{Key: start.AsSlice(), IntervalEnd: false}) - + end := netipx.PrefixLastIP(e).Next() // may be invalid if end is overflowed if end.IsValid() { elems = append(elems, nftables.SetElement{Key: end.AsSlice(), IntervalEnd: true}) diff --git a/pkg/upstream/transport/conn_quic.go b/pkg/upstream/transport/conn_quic.go index 1427c383b..680371ccc 100644 --- a/pkg/upstream/transport/conn_quic.go +++ b/pkg/upstream/transport/conn_quic.go @@ -120,4 +120,4 @@ func (ote *quicReservedExchanger) WithdrawReserved() { s := ote.stream s.CancelRead(_DOQ_REQUEST_CANCELLED) s.CancelWrite(_DOQ_REQUEST_CANCELLED) -} \ No newline at end of file +} diff --git a/plugin/enabled_plugins.go b/plugin/enabled_plugins.go index 764811fc7..2d19752d7 100644 --- a/plugin/enabled_plugins.go +++ b/plugin/enabled_plugins.go @@ -71,4 +71,7 @@ import ( _ "github.com/IrineSistiana/mosdns/v5/plugin/server/quic_server" _ "github.com/IrineSistiana/mosdns/v5/plugin/server/tcp_server" _ "github.com/IrineSistiana/mosdns/v5/plugin/server/udp_server" + + // statistics + _ "github.com/IrineSistiana/mosdns/v5/plugin/statistics/simple" ) diff --git a/plugin/executable/arbitrary/arbitrary.go b/plugin/executable/arbitrary/arbitrary.go index 90e40109d..51f0f58ef 100644 --- a/plugin/executable/arbitrary/arbitrary.go +++ b/plugin/executable/arbitrary/arbitrary.go @@ -23,12 +23,14 @@ import ( "bytes" "context" "fmt" + "os" + "strings" + "github.com/IrineSistiana/mosdns/v5/coremain" "github.com/IrineSistiana/mosdns/v5/pkg/query_context" "github.com/IrineSistiana/mosdns/v5/pkg/zone_file" "github.com/IrineSistiana/mosdns/v5/plugin/executable/sequence" - "os" - "strings" + "github.com/IrineSistiana/mosdns/v5/plugin/statistics" ) const PluginType = "arbitrary" @@ -72,6 +74,7 @@ func NewArbitrary(args *Args) (*Arbitrary, error) { func (a *Arbitrary) Exec(_ context.Context, qCtx *query_context.Context) error { if r := a.m.Reply(qCtx.Q()); r != nil { qCtx.SetResponse(r) + qCtx.StoreValue(statistics.ArbitraryStoreKey, true) } return nil } diff --git a/plugin/executable/cache/cache.go b/plugin/executable/cache/cache.go index bc001127b..fbc3a1a59 100644 --- a/plugin/executable/cache/cache.go +++ b/plugin/executable/cache/cache.go @@ -38,6 +38,7 @@ import ( "github.com/IrineSistiana/mosdns/v5/pkg/query_context" "github.com/IrineSistiana/mosdns/v5/pkg/utils" "github.com/IrineSistiana/mosdns/v5/plugin/executable/sequence" + "github.com/IrineSistiana/mosdns/v5/plugin/statistics" "github.com/go-chi/chi/v5" "github.com/klauspost/compress/gzip" "github.com/miekg/dns" @@ -202,6 +203,7 @@ func (c *Cache) Exec(ctx context.Context, qCtx *query_context.Context, next sequ } if cachedResp != nil { // cache hit c.hitTotal.Inc() + qCtx.StoreValue(statistics.CacaheStoreKey, cachedResp.Id) cachedResp.Id = q.Id // change msg id qCtx.SetResponse(cachedResp) } diff --git a/plugin/executable/forward/forward.go b/plugin/executable/forward/forward.go index 8a024e63f..e7c7b3194 100644 --- a/plugin/executable/forward/forward.go +++ b/plugin/executable/forward/forward.go @@ -33,6 +33,7 @@ import ( "github.com/IrineSistiana/mosdns/v5/pkg/upstream" "github.com/IrineSistiana/mosdns/v5/pkg/utils" "github.com/IrineSistiana/mosdns/v5/plugin/executable/sequence" + "github.com/IrineSistiana/mosdns/v5/plugin/statistics" "github.com/miekg/dns" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" @@ -193,11 +194,12 @@ func (f *Forward) RegisterMetricsTo(r prometheus.Registerer) error { } func (f *Forward) Exec(ctx context.Context, qCtx *query_context.Context) (err error) { - r, err := f.exchange(ctx, qCtx, f.us) + r, name, err := f.exchange(ctx, qCtx, f.us) if err != nil { return err } qCtx.SetResponse(r) + qCtx.StoreValue(statistics.ForwardStoreKey, name) return nil } @@ -216,11 +218,12 @@ func (f *Forward) QuickConfigureExec(args string) (any, error) { } } var execFunc sequence.ExecutableFunc = func(ctx context.Context, qCtx *query_context.Context) error { - r, err := f.exchange(ctx, qCtx, us) + r, name, err := f.exchange(ctx, qCtx, us) if err != nil { return err } qCtx.SetResponse(r) + qCtx.StoreValue(statistics.ForwardStoreKey, name) return nil } return execFunc, nil @@ -233,14 +236,14 @@ func (f *Forward) Close() error { return nil } -func (f *Forward) exchange(ctx context.Context, qCtx *query_context.Context, us []*upstreamWrapper) (*dns.Msg, error) { +func (f *Forward) exchange(ctx context.Context, qCtx *query_context.Context, us []*upstreamWrapper) (*dns.Msg, string, error) { if len(us) == 0 { - return nil, errors.New("no upstream to exchange") + return nil, "", errors.New("no upstream to exchange") } queryPayload, err := pool.PackBuffer(qCtx.Q()) if err != nil { - return nil, err + return nil, "", err } defer pool.ReleaseBuf(queryPayload) @@ -255,6 +258,7 @@ func (f *Forward) exchange(ctx context.Context, qCtx *query_context.Context, us type res struct { r *dns.Msg err error + u string } resChan := make(chan res) @@ -291,7 +295,7 @@ func (f *Forward) exchange(ctx context.Context, qCtx *query_context.Context, us } } select { - case resChan <- res{r: r, err: err}: + case resChan <- res{r: r, u: u.name(), err: err}: case <-done: } }(qCtx.Id(), qCtx.QQuestion()) @@ -309,12 +313,12 @@ func (f *Forward) exchange(ctx context.Context, qCtx *query_context.Context, us if i < concurrent-1 && r.Rcode != dns.RcodeSuccess && r.Rcode != dns.RcodeNameError { continue } - return r, nil + return r, res.u, nil case <-ctx.Done(): - return nil, context.Cause(ctx) + return nil, "", context.Cause(ctx) } } - return nil, errors.New("all upstream servers failed") + return nil, "", errors.New("all upstream servers failed") } func quickSetup(bq sequence.BQ, s string) (any, error) { diff --git a/plugin/executable/hosts/hosts.go b/plugin/executable/hosts/hosts.go index de405bb86..9535c16db 100644 --- a/plugin/executable/hosts/hosts.go +++ b/plugin/executable/hosts/hosts.go @@ -23,13 +23,15 @@ import ( "bytes" "context" "fmt" + "os" + "github.com/IrineSistiana/mosdns/v5/coremain" "github.com/IrineSistiana/mosdns/v5/pkg/hosts" "github.com/IrineSistiana/mosdns/v5/pkg/matcher/domain" "github.com/IrineSistiana/mosdns/v5/pkg/query_context" "github.com/IrineSistiana/mosdns/v5/plugin/executable/sequence" + "github.com/IrineSistiana/mosdns/v5/plugin/statistics" "github.com/miekg/dns" - "os" ) const PluginType = "hosts" @@ -84,6 +86,7 @@ func (h *Hosts) Exec(_ context.Context, qCtx *query_context.Context) error { r := h.h.LookupMsg(qCtx.Q()) if r != nil { qCtx.SetResponse(r) + qCtx.StoreValue(statistics.HostStoreKey, true) } return nil } diff --git a/plugin/executable/sequence/fallback/fallback.go b/plugin/executable/sequence/fallback/fallback.go index 7145a8fff..301f3262d 100644 --- a/plugin/executable/sequence/fallback/fallback.go +++ b/plugin/executable/sequence/fallback/fallback.go @@ -29,6 +29,7 @@ import ( "github.com/IrineSistiana/mosdns/v5/pkg/pool" "github.com/IrineSistiana/mosdns/v5/pkg/query_context" "github.com/IrineSistiana/mosdns/v5/plugin/executable/sequence" + "github.com/IrineSistiana/mosdns/v5/plugin/statistics" "github.com/miekg/dns" "go.uber.org/zap" ) @@ -131,6 +132,7 @@ func (f *fallback) doFallback(ctx context.Context, qCtx *query_context.Context) close(primDone) respChan <- r } + qCtx.StoreValue(statistics.FallbackStoreKey, uint(1)) }() // Secondary goroutine. @@ -168,6 +170,7 @@ func (f *fallback) doFallback(ctx context.Context, qCtx *query_context.Context) } } respChan <- r + qCtx.StoreValue(statistics.FallbackStoreKey, uint(2)) }() for i := 0; i < 2; i++ { diff --git a/plugin/statistics/key.go b/plugin/statistics/key.go new file mode 100644 index 000000000..c60b7bfbf --- /dev/null +++ b/plugin/statistics/key.go @@ -0,0 +1,11 @@ +package statistics + +import "github.com/IrineSistiana/mosdns/v5/pkg/query_context" + +var ( + ArbitraryStoreKey = query_context.RegKey() + HostStoreKey = query_context.RegKey() + ForwardStoreKey = query_context.RegKey() + FallbackStoreKey = query_context.RegKey() + CacaheStoreKey = query_context.RegKey() +) diff --git a/plugin/statistics/simple/api.go b/plugin/statistics/simple/api.go new file mode 100644 index 000000000..cef9b425a --- /dev/null +++ b/plugin/statistics/simple/api.go @@ -0,0 +1,52 @@ +package simple + +import ( + "container/list" + "net/http" + "time" + + "github.com/go-chi/chi/v5" + "go.uber.org/zap" +) + +func (c *simpleServer) Api() *chi.Mux { + r := chi.NewRouter() + r.Get("/statistics", c.statistics) + return r +} + +func (c *simpleServer) statistics(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.WriteHeader(200) + flusher, ok := w.(http.Flusher) + if !ok { + w.Write([]byte("Unsupported connection method")) + return + } + + var elem *list.Element // maybe nil + var lastElem *list.Element + for { + select { + case <-r.Context().Done(): + return + default: + if lastElem != nil { + elem = lastElem.Next() + } else { + elem = c.backend.Front() + } + + if elem != nil { + lastElem = elem + if _, err := w.Write(append(elem.Value.([]byte), '\n')); err != nil { + c.logger.Warn("Http write error", zap.Error(err)) + return + } + flusher.Flush() + } else { + time.Sleep(time.Second) + } + } + } +} diff --git a/plugin/statistics/simple/exec.go b/plugin/statistics/simple/exec.go new file mode 100644 index 000000000..c1f67ce3e --- /dev/null +++ b/plugin/statistics/simple/exec.go @@ -0,0 +1,74 @@ +package simple + +import ( + "bytes" + "context" + "encoding/json" + "io" + "net/http" + "time" + + "github.com/IrineSistiana/mosdns/v5/pkg/query_context" + "github.com/IrineSistiana/mosdns/v5/plugin/executable/sequence" + "go.uber.org/zap" +) + +var ( + httpClient = &http.Client{ + Transport: &http.Transport{ + TLSHandshakeTimeout: time.Second * 5, + IdleConnTimeout: time.Second * 60, + MaxIdleConns: 5, + MaxIdleConnsPerHost: 5, + DisableCompression: true, + }, + } +) + +func (m *simpleServer) Exec(ctx context.Context, qCtx *query_context.Context, next sequence.ChainWalker) (err error) { + record := recordPool.Get().(*record) + defer record.release() + + record.SetQuery(qCtx) + + err = next.ExecNext(ctx, qCtx) + record.Err = err + + if r := qCtx.R(); r != nil { + record.SetResp(qCtx) + } + + bd, _ := json.Marshal(record) + go m.push(bd) + return +} + +func (m *simpleServer) push(d []byte) { + if m.backend.Len() > m.args.Size { + _ = m.backend.Remove(m.backend.Front()) + } + m.backend.PushBack(d) + m.send(d) +} + +func (m *simpleServer) send(d []byte) (err error) { + if m.args.WebHook == "" { + return + } + resp, err := httpClient.Post(m.args.WebHook, "application/json", bytes.NewBuffer(d)) + if err != nil { + m.logger.Debug("HTTP request failed", zap.Error(err)) + return + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + m.logger.Debug("Failed to read response", zap.Error(err)) + return + } + defer resp.Body.Close() // Close the response body. + if resp.StatusCode > 299 { + m.logger.Debug("WebHook sent incorrect data", zap.String("status", resp.Status), zap.String("body", string(body))) + } + return +} diff --git a/plugin/statistics/simple/init.go b/plugin/statistics/simple/init.go new file mode 100644 index 000000000..90ade3853 --- /dev/null +++ b/plugin/statistics/simple/init.go @@ -0,0 +1,86 @@ +package simple + +/* + * Copyright (C) 2020-2022, IrineSistiana + * + * This file is part of mosdns. + * + * mosdns is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * mosdns is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +import ( + "container/list" + "time" + + "github.com/IrineSistiana/mosdns/v5/coremain" + "github.com/IrineSistiana/mosdns/v5/pkg/utils" + "github.com/IrineSistiana/mosdns/v5/plugin/executable/sequence" + "go.uber.org/zap" +) + +const PluginType = "statistics_simple" + +var _ sequence.RecursiveExecutable = (*simpleServer)(nil) + +func init() { + coremain.RegNewPluginFunc(PluginType, Init, func() any { return new(Args) }) +} + +// Args is the arguments of plugin. It will be decoded from yaml. +// So it is recommended to use `yaml` as struct field's tag. +type Args struct { + Size int `yaml:"size"` + WebHook string `yaml:"web_hook"` + WebHookTimeout int `yaml:"web_hook_timeout"` +} + +func (a *Args) init() { + utils.SetDefaultUnsignNum(&a.Size, 128) + utils.SetDefaultUnsignNum(&a.WebHookTimeout, 5) +} + +type simpleServer struct { + args *Args + logger *zap.Logger + + backend *list.List +} + +func Init(bp *coremain.BP, args any) (any, error) { + ss, err := NewUiServer(args.(*Args), bp.L()) + if err != nil { + return nil, err + } + bp.RegAPI(ss.Api()) + return ss, nil +} + +func NewUiServer(args *Args, l *zap.Logger) (ss *simpleServer, err error) { + args.init() + + httpClient.Timeout = time.Second * time.Duration(args.WebHookTimeout) + + logger := l + if logger == nil { + logger = zap.NewNop() + } + + ss = &simpleServer{ + backend: list.New(), + args: args, + logger: logger, + } + + return +} diff --git a/plugin/statistics/simple/record.go b/plugin/statistics/simple/record.go new file mode 100644 index 000000000..b8813afac --- /dev/null +++ b/plugin/statistics/simple/record.go @@ -0,0 +1,181 @@ +package simple + +import ( + "sync" + "time" + + "github.com/IrineSistiana/mosdns/v5/pkg/query_context" + "github.com/IrineSistiana/mosdns/v5/plugin/statistics" + "github.com/miekg/dns" +) + +var recordPool = sync.Pool{ + New: func() interface{} { + return &record{} + }, +} + +type record struct { + ID uint32 + StartAt time.Time + ClientIP string + + IsArbitrary bool `json:",omitempty"` + IsHost bool `json:",omitempty"` + ForwardName string `json:",omitempty"` + Fallback uint `json:",omitempty"` + CacheID uint16 `json:",omitempty"` + + Consuming string + + Op string `json:",omitempty"` + Status string `json:",omitempty"` + + QOpt string `json:",omitempty"` + ClientOpt string `json:",omitempty"` + Query map[string][]any + + RespOpt string `json:",omitempty"` + UpstreamOpt string `json:",omitempty"` + Resp map[string][]any `json:",omitempty"` + + Err error `json:",omitempty"` +} + +func (m *record) release() { + m.IsArbitrary = false + m.IsHost = false + m.ForwardName = "" + m.Fallback = 0 + m.CacheID = 0 + + m.QOpt = "" + m.ClientOpt = "" + + recordPool.Put(m) +} + +func (m *record) SetQuery(qCtx *query_context.Context) { + m.ID = qCtx.Id() + m.StartAt = qCtx.StartTime() + m.ClientIP = qCtx.ServerMeta.ClientAddr.String() + if qCtx.QOpt() != nil { + m.QOpt = qCtx.QOpt().String() + } + if qCtx.ClientOpt() != nil { + m.ClientOpt = qCtx.ClientOpt().String() + } + + m.Query = makeDnsMsg(qCtx.Q()) +} + +func (m *record) SetResp(qCtx *query_context.Context) { + m.Resp = makeDnsMsg(qCtx.R()) + + if qCtx.RespOpt() != nil { + m.RespOpt = qCtx.RespOpt().String() + } + if qCtx.ClientOpt() != nil { + m.ClientOpt = qCtx.ClientOpt().String() + } + + if isArbitrary, has := qCtx.GetValue(statistics.ArbitraryStoreKey); has { + m.IsArbitrary = isArbitrary.(bool) + } + if isHost, has := qCtx.GetValue(statistics.HostStoreKey); has { + m.IsHost = isHost.(bool) + } + if forwardName, has := qCtx.GetValue(statistics.ForwardStoreKey); has { + m.ForwardName = forwardName.(string) + } + if fallback, has := qCtx.GetValue(statistics.FallbackStoreKey); has { + m.Fallback = fallback.(uint) + } + if cacheID, has := qCtx.GetValue(statistics.CacaheStoreKey); has { + m.CacheID = cacheID.(uint16) + } + + m.Consuming = time.Since(qCtx.StartTime()).String() + + m.Status = dns.RcodeToString[qCtx.R().Rcode] + m.Op = dns.OpcodeToString[qCtx.R().Opcode] +} + +func makeHflags(h dns.MsgHdr) (flags []any) { + flags = []any{} + if h.Response { + flags = append(flags, "qr") + } + if h.Authoritative { + flags = append(flags, "aa") + } + if h.Truncated { + flags = append(flags, "tc") + } + if h.RecursionDesired { + flags = append(flags, "rd") + } + if h.RecursionAvailable { + flags = append(flags, "ra") + } + if h.Zero { // Hmm + flags = append(flags, "z") + } + if h.AuthenticatedData { + flags = append(flags, "ad") + } + if h.CheckingDisabled { + flags = append(flags, "aa") + } + return +} + +func makeDnsMsg(m *dns.Msg) (data map[string][]any) { + data = make(map[string][]any) + data["flags"] = makeHflags(m.MsgHdr) + if len(m.Question) > 0 { + data["Question"] = []any{} + for _, q := range m.Question { + data["Question"] = append(data["Question"], makeQuestion(q)) + } + } + if len(m.Answer) > 0 { + data["Answer"] = []any{} + for _, q := range m.Answer { + data["Answer"] = append(data["Answer"], makeRR(q)) + } + } + if len(m.Ns) > 0 { + data["Ns"] = []any{} + for _, ns := range m.Ns { + data["Ns"] = append(data["Ns"], makeRR(ns)) + } + } + if len(m.Extra) > 0 { + data["Extra"] = []any{} + for _, extra := range m.Extra { + data["Extra"] = append(data["Extra"], makeRR(extra)) + } + } + return +} + +func makeQuestion(q dns.Question) map[string]string { + return map[string]string{ + "Name": q.Name, + "Type": dns.TypeToString[q.Qtype], + "Class": dns.ClassToString[q.Qclass], + } +} + +func makeRR(q dns.RR) map[string]any { + h := q.Header() + return map[string]any{ + "name": h.Name, + "ttl": h.Ttl, + "Type": dns.TypeToString[h.Rrtype], + "Class": dns.ClassToString[h.Class], + "Header": h.String(), + "value": q.String(), + } +}