Skip to content

Commit

Permalink
feat(task): Optimize Container Log Reading (#7575)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengkunwang223 authored Dec 27, 2024
1 parent 8fe40b7 commit 554d2ab
Show file tree
Hide file tree
Showing 16 changed files with 314 additions and 511 deletions.
69 changes: 24 additions & 45 deletions agent/app/api/v2/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/1Panel-dev/1Panel/agent/app/api/v2/helper"
"github.com/1Panel-dev/1Panel/agent/app/dto"
"github.com/1Panel-dev/1Panel/agent/global"
"github.com/gin-gonic/gin"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -470,34 +469,6 @@ func (b *BaseApi) Inspect(c *gin.Context) {
helper.SuccessWithData(c, result)
}

// @Tags Container
// @Summary Container logs
// @Description 容器日志
// @Param container query string false "容器名称"
// @Param since query string false "时间筛选"
// @Param follow query string false "是否追踪"
// @Param tail query string false "显示行号"
// @Security ApiKeyAuth
// @Router /containers/search/log [post]
func (b *BaseApi) ContainerLogs(c *gin.Context) {
wsConn, err := upGrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
global.LOG.Errorf("gin context http handler failed, err: %v", err)
return
}
defer wsConn.Close()

container := c.Query("container")
since := c.Query("since")
follow := c.Query("follow") == "true"
tail := c.Query("tail")

if err := containerService.ContainerLogs(wsConn, "container", container, since, tail, follow); err != nil {
_ = wsConn.WriteMessage(1, []byte(err.Error()))
return
}
}

// @Description 下载容器日志
// @Router /containers/download/log [post]
func (b *BaseApi) DownloadContainerLogs(c *gin.Context) {
Expand Down Expand Up @@ -707,30 +678,38 @@ func (b *BaseApi) ComposeUpdate(c *gin.Context) {
helper.SuccessWithData(c, nil)
}

// @Tags Container Compose
// @Summary Container Compose logs
// @Description docker-compose 日志
// @Param compose query string false "compose 文件地址"
// @Tags Container
// @Summary Container logs
// @Description 容器日志
// @Param container query string false "容器名称"
// @Param since query string false "时间筛选"
// @Param follow query string false "是否追踪"
// @Param tail query string false "显示行号"
// @Security ApiKeyAuth
// @Router /containers/compose/search/log [get]
func (b *BaseApi) ComposeLogs(c *gin.Context) {
wsConn, err := upGrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
global.LOG.Errorf("gin context http handler failed, err: %v", err)
return
}
defer wsConn.Close()
// @Router /containers/search/log [post]
func (b *BaseApi) ContainerStreamLogs(c *gin.Context) {
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
c.Header("Transfer-Encoding", "chunked")

compose := c.Query("compose")
since := c.Query("since")
follow := c.Query("follow") == "true"
tail := c.Query("tail")

if err := containerService.ContainerLogs(wsConn, "compose", compose, since, tail, follow); err != nil {
_ = wsConn.WriteMessage(1, []byte(err.Error()))
return
container := c.Query("container")
compose := c.Query("compose")
streamLog := dto.StreamLog{
Compose: compose,
Container: container,
Since: since,
Follow: follow,
Tail: tail,
Type: "container",
}
if compose != "" {
streamLog.Type = "compose"
}

containerService.StreamLogs(c, streamLog)
}
9 changes: 9 additions & 0 deletions agent/app/dto/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,3 +268,12 @@ type ContainerLog struct {
Tail uint `json:"tail"`
ContainerType string `json:"containerType"`
}

type StreamLog struct {
Compose string
Container string
Since string
Follow bool
Tail string
Type string
}
144 changes: 71 additions & 73 deletions agent/app/service/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"github.com/gin-gonic/gin"
"io"
"net/http"
"net/url"
Expand All @@ -19,9 +20,6 @@ import (
"sync"
"syscall"
"time"
"unicode/utf8"

"github.com/gin-gonic/gin"

"github.com/pkg/errors"

Expand All @@ -44,7 +42,6 @@ import (
"github.com/docker/docker/api/types/volume"
"github.com/docker/docker/client"
"github.com/docker/go-connections/nat"
"github.com/gorilla/websocket"
v1 "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/shirou/gopsutil/v3/cpu"
"github.com/shirou/gopsutil/v3/mem"
Expand Down Expand Up @@ -74,7 +71,6 @@ type IContainerService interface {
ContainerCommit(req dto.ContainerCommit) error
ContainerLogClean(req dto.OperationWithName) error
ContainerOperation(req dto.ContainerOperation) error
ContainerLogs(wsConn *websocket.Conn, containerType, container, since, tail string, follow bool) error
DownloadContainerLogs(containerType, container, since, tail string, c *gin.Context) error
ContainerStats(id string) (*dto.ContainerStats, error)
Inspect(req dto.InspectReq) (string, error)
Expand All @@ -87,6 +83,8 @@ type IContainerService interface {
Prune(req dto.ContainerPrune) (dto.ContainerPruneReport, error)

LoadContainerLogs(req dto.OperationWithNameAndType) string

StreamLogs(ctx *gin.Context, params dto.StreamLog)
}

func NewIContainerService() IContainerService {
Expand Down Expand Up @@ -794,87 +792,87 @@ func (u *ContainerService) ContainerLogClean(req dto.OperationWithName) error {
return nil
}

func (u *ContainerService) ContainerLogs(wsConn *websocket.Conn, containerType, container, since, tail string, follow bool) error {
defer func() { wsConn.Close() }()
if cmd.CheckIllegal(container, since, tail) {
return buserr.New(constant.ErrCmdIllegal)
func (u *ContainerService) StreamLogs(ctx *gin.Context, params dto.StreamLog) {
messageChan := make(chan string, 1024)
errorChan := make(chan error, 1)

go collectLogs(params, messageChan, errorChan)

ctx.Stream(func(w io.Writer) bool {
select {
case msg, ok := <-messageChan:
if !ok {
return false
}
_, err := fmt.Fprintf(w, "data: %v\n\n", msg)
if err != nil {
return false
}
return true
case err := <-errorChan:
_, err = fmt.Fprintf(w, "data: {\"event\": \"error\", \"data\": \"%s\"}\n\n", err.Error())
if err != nil {
return false
}
return false
case <-ctx.Request.Context().Done():
return false
}
})
}

func collectLogs(params dto.StreamLog, messageChan chan<- string, errorChan chan<- error) {
defer close(messageChan)
defer close(errorChan)

var cmdArgs []string
if params.Type == "compose" {
cmdArgs = []string{"compose", "-f", params.Compose}
}
commandName := "docker"
commandArg := []string{"logs", container}
if containerType == "compose" {
commandArg = []string{"compose", "-f", container, "logs"}
cmdArgs = append(cmdArgs, "logs")
if params.Follow {
cmdArgs = append(cmdArgs, "-f")
}
if tail != "0" {
commandArg = append(commandArg, "--tail")
commandArg = append(commandArg, tail)
if params.Tail != "all" {
cmdArgs = append(cmdArgs, "--tail", params.Tail)
}
if since != "all" {
commandArg = append(commandArg, "--since")
commandArg = append(commandArg, since)
if params.Since != "all" {
cmdArgs = append(cmdArgs, "--since", params.Since)
}
if follow {
commandArg = append(commandArg, "-f")
}
if !follow {
cmd := exec.Command(commandName, commandArg...)
cmd.Stderr = cmd.Stdout
stdout, _ := cmd.CombinedOutput()
if !utf8.Valid(stdout) {
return errors.New("invalid utf8")
}
if err := wsConn.WriteMessage(websocket.TextMessage, stdout); err != nil {
global.LOG.Errorf("send message with log to ws failed, err: %v", err)
}
return nil
if params.Container != "" {
cmdArgs = append(cmdArgs, params.Container)
}
cmd := exec.Command("docker", cmdArgs...)

cmd := exec.Command(commandName, commandArg...)
stdout, err := cmd.StdoutPipe()
if err != nil {
_ = cmd.Process.Signal(syscall.SIGTERM)
return err
errorChan <- fmt.Errorf("failed to get stdout pipe: %v", err)
return
}
cmd.Stderr = cmd.Stdout
if err := cmd.Start(); err != nil {
_ = cmd.Process.Signal(syscall.SIGTERM)
return err
errorChan <- fmt.Errorf("failed to start command: %v", err)
return
}
exitCh := make(chan struct{})
go func() {
_, wsData, _ := wsConn.ReadMessage()
if string(wsData) == "close conn" {
_ = cmd.Process.Signal(syscall.SIGTERM)
exitCh <- struct{}{}
}
}()

go func() {
buffer := make([]byte, 1024)
for {
select {
case <-exitCh:
return
default:
n, err := stdout.Read(buffer)
if err != nil {
if err == io.EOF {
return
}
global.LOG.Errorf("read bytes from log failed, err: %v", err)
return
}
if !utf8.Valid(buffer[:n]) {
continue
}
if err = wsConn.WriteMessage(websocket.TextMessage, buffer[:n]); err != nil {
global.LOG.Errorf("send message with log to ws failed, err: %v", err)
return
}
}
scanner := bufio.NewScanner(stdout)
lineNumber := 0

for scanner.Scan() {
lineNumber++
message := scanner.Text()
select {
case messageChan <- message:
case <-time.After(time.Second):
errorChan <- fmt.Errorf("message channel blocked")
return
}
}()
_ = cmd.Wait()
return nil
}

if err := scanner.Err(); err != nil {
errorChan <- fmt.Errorf("scanner error: %v", err)
return
}
cmd.Wait()
}

func (u *ContainerService) DownloadContainerLogs(containerType, container, since, tail string, c *gin.Context) error {
Expand Down
17 changes: 16 additions & 1 deletion agent/init/migration/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ import (
)

func Init() {
InitAgentDB()
InitTaskDB()
global.LOG.Info("Migration run successfully")
}

func InitAgentDB() {
m := gormigrate.New(global.DB, gormigrate.DefaultOptions, []*gormigrate.Migration{
migrations.AddTable,
migrations.AddMonitorTable,
Expand All @@ -20,5 +26,14 @@ func Init() {
global.LOG.Error(err)
panic(err)
}
global.LOG.Info("Migration run successfully")
}

func InitTaskDB() {
m := gormigrate.New(global.TaskDB, gormigrate.DefaultOptions, []*gormigrate.Migration{
migrations.AddTaskTable,
})
if err := m.Migrate(); err != nil {
global.LOG.Error(err)
panic(err)
}
}
9 changes: 9 additions & 0 deletions agent/init/migration/migrations/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,12 @@ var InitPHPExtensions = &gormigrate.Migration{
return nil
},
}

var AddTaskTable = &gormigrate.Migration{
ID: "20241226-add-task",
Migrate: func(tx *gorm.DB) error {
return tx.AutoMigrate(
&model.Task{},
)
},
}
3 changes: 1 addition & 2 deletions agent/router/ro_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (s *ContainerRouter) InitRouter(Router *gin.RouterGroup) {
baRouter.POST("/list", baseApi.ListContainer)
baRouter.GET("/status", baseApi.LoadContainerStatus)
baRouter.GET("/list/stats", baseApi.ContainerListStats)
baRouter.GET("/search/log", baseApi.ContainerLogs)
baRouter.GET("/search/log", baseApi.ContainerStreamLogs)
baRouter.POST("/download/log", baseApi.DownloadContainerLogs)
baRouter.GET("/limit", baseApi.LoadResourceLimit)
baRouter.POST("/clean/log", baseApi.CleanContainerLog)
Expand All @@ -46,7 +46,6 @@ func (s *ContainerRouter) InitRouter(Router *gin.RouterGroup) {
baRouter.POST("/compose/test", baseApi.TestCompose)
baRouter.POST("/compose/operate", baseApi.OperatorCompose)
baRouter.POST("/compose/update", baseApi.ComposeUpdate)
baRouter.GET("/compose/search/log", baseApi.ComposeLogs)

baRouter.GET("/template", baseApi.ListComposeTemplate)
baRouter.POST("/template/search", baseApi.SearchComposeTemplate)
Expand Down
3 changes: 2 additions & 1 deletion frontend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@
"vue-codemirror": "^6.1.1",
"vue-demi": "^0.14.6",
"vue-i18n": "^9.13.1",
"vue-router": "^4.3.3"
"vue-router": "^4.3.3",
"vue-virtual-scroller": "^2.0.0-beta.8"
},
"devDependencies": {
"@types/node": "^20.14.8",
Expand Down
Loading

0 comments on commit 554d2ab

Please sign in to comment.