Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(task): Optimize Container Log Reading #7575

Merged
merged 1 commit into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 24 additions & 45 deletions agent/app/api/v2/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/1Panel-dev/1Panel/agent/app/api/v2/helper"
"github.com/1Panel-dev/1Panel/agent/app/dto"
"github.com/1Panel-dev/1Panel/agent/global"
"github.com/gin-gonic/gin"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -470,34 +469,6 @@ func (b *BaseApi) Inspect(c *gin.Context) {
helper.SuccessWithData(c, result)
}

// @Tags Container
// @Summary Container logs
// @Description 容器日志
// @Param container query string false "容器名称"
// @Param since query string false "时间筛选"
// @Param follow query string false "是否追踪"
// @Param tail query string false "显示行号"
// @Security ApiKeyAuth
// @Router /containers/search/log [post]
func (b *BaseApi) ContainerLogs(c *gin.Context) {
wsConn, err := upGrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
global.LOG.Errorf("gin context http handler failed, err: %v", err)
return
}
defer wsConn.Close()

container := c.Query("container")
since := c.Query("since")
follow := c.Query("follow") == "true"
tail := c.Query("tail")

if err := containerService.ContainerLogs(wsConn, "container", container, since, tail, follow); err != nil {
_ = wsConn.WriteMessage(1, []byte(err.Error()))
return
}
}

// @Description 下载容器日志
// @Router /containers/download/log [post]
func (b *BaseApi) DownloadContainerLogs(c *gin.Context) {
Expand Down Expand Up @@ -707,30 +678,38 @@ func (b *BaseApi) ComposeUpdate(c *gin.Context) {
helper.SuccessWithData(c, nil)
}

// @Tags Container Compose
// @Summary Container Compose logs
// @Description docker-compose 日志
// @Param compose query string false "compose 文件地址"
// @Tags Container
// @Summary Container logs
// @Description 容器日志
// @Param container query string false "容器名称"
// @Param since query string false "时间筛选"
// @Param follow query string false "是否追踪"
// @Param tail query string false "显示行号"
// @Security ApiKeyAuth
// @Router /containers/compose/search/log [get]
func (b *BaseApi) ComposeLogs(c *gin.Context) {
wsConn, err := upGrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
global.LOG.Errorf("gin context http handler failed, err: %v", err)
return
}
defer wsConn.Close()
// @Router /containers/search/log [post]
func (b *BaseApi) ContainerStreamLogs(c *gin.Context) {
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
c.Header("Transfer-Encoding", "chunked")

compose := c.Query("compose")
since := c.Query("since")
follow := c.Query("follow") == "true"
tail := c.Query("tail")

if err := containerService.ContainerLogs(wsConn, "compose", compose, since, tail, follow); err != nil {
_ = wsConn.WriteMessage(1, []byte(err.Error()))
return
container := c.Query("container")
compose := c.Query("compose")
streamLog := dto.StreamLog{
Compose: compose,
Container: container,
Since: since,
Follow: follow,
Tail: tail,
Type: "container",
}
if compose != "" {
streamLog.Type = "compose"
}

containerService.StreamLogs(c, streamLog)
}
9 changes: 9 additions & 0 deletions agent/app/dto/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,3 +268,12 @@ type ContainerLog struct {
Tail uint `json:"tail"`
ContainerType string `json:"containerType"`
}

type StreamLog struct {
Compose string
Container string
Since string
Follow bool
Tail string
Type string
}
144 changes: 71 additions & 73 deletions agent/app/service/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"github.com/gin-gonic/gin"
"io"
"net/http"
"net/url"
Expand All @@ -19,9 +20,6 @@ import (
"sync"
"syscall"
"time"
"unicode/utf8"

"github.com/gin-gonic/gin"

"github.com/pkg/errors"

Expand All @@ -44,7 +42,6 @@ import (
"github.com/docker/docker/api/types/volume"
"github.com/docker/docker/client"
"github.com/docker/go-connections/nat"
"github.com/gorilla/websocket"
v1 "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/shirou/gopsutil/v3/cpu"
"github.com/shirou/gopsutil/v3/mem"
Expand Down Expand Up @@ -74,7 +71,6 @@ type IContainerService interface {
ContainerCommit(req dto.ContainerCommit) error
ContainerLogClean(req dto.OperationWithName) error
ContainerOperation(req dto.ContainerOperation) error
ContainerLogs(wsConn *websocket.Conn, containerType, container, since, tail string, follow bool) error
DownloadContainerLogs(containerType, container, since, tail string, c *gin.Context) error
ContainerStats(id string) (*dto.ContainerStats, error)
Inspect(req dto.InspectReq) (string, error)
Expand All @@ -87,6 +83,8 @@ type IContainerService interface {
Prune(req dto.ContainerPrune) (dto.ContainerPruneReport, error)

LoadContainerLogs(req dto.OperationWithNameAndType) string

StreamLogs(ctx *gin.Context, params dto.StreamLog)
}

func NewIContainerService() IContainerService {
Expand Down Expand Up @@ -794,87 +792,87 @@ func (u *ContainerService) ContainerLogClean(req dto.OperationWithName) error {
return nil
}

func (u *ContainerService) ContainerLogs(wsConn *websocket.Conn, containerType, container, since, tail string, follow bool) error {
defer func() { wsConn.Close() }()
if cmd.CheckIllegal(container, since, tail) {
return buserr.New(constant.ErrCmdIllegal)
func (u *ContainerService) StreamLogs(ctx *gin.Context, params dto.StreamLog) {
messageChan := make(chan string, 1024)
errorChan := make(chan error, 1)

go collectLogs(params, messageChan, errorChan)

ctx.Stream(func(w io.Writer) bool {
select {
case msg, ok := <-messageChan:
if !ok {
return false
}
_, err := fmt.Fprintf(w, "data: %v\n\n", msg)
if err != nil {
return false
}
return true
case err := <-errorChan:
_, err = fmt.Fprintf(w, "data: {\"event\": \"error\", \"data\": \"%s\"}\n\n", err.Error())
if err != nil {
return false
}
return false
case <-ctx.Request.Context().Done():
return false
}
})
}

func collectLogs(params dto.StreamLog, messageChan chan<- string, errorChan chan<- error) {
defer close(messageChan)
defer close(errorChan)

var cmdArgs []string
if params.Type == "compose" {
cmdArgs = []string{"compose", "-f", params.Compose}
}
commandName := "docker"
commandArg := []string{"logs", container}
if containerType == "compose" {
commandArg = []string{"compose", "-f", container, "logs"}
cmdArgs = append(cmdArgs, "logs")
if params.Follow {
cmdArgs = append(cmdArgs, "-f")
}
if tail != "0" {
commandArg = append(commandArg, "--tail")
commandArg = append(commandArg, tail)
if params.Tail != "all" {
cmdArgs = append(cmdArgs, "--tail", params.Tail)
}
if since != "all" {
commandArg = append(commandArg, "--since")
commandArg = append(commandArg, since)
if params.Since != "all" {
cmdArgs = append(cmdArgs, "--since", params.Since)
}
if follow {
commandArg = append(commandArg, "-f")
}
if !follow {
cmd := exec.Command(commandName, commandArg...)
cmd.Stderr = cmd.Stdout
stdout, _ := cmd.CombinedOutput()
if !utf8.Valid(stdout) {
return errors.New("invalid utf8")
}
if err := wsConn.WriteMessage(websocket.TextMessage, stdout); err != nil {
global.LOG.Errorf("send message with log to ws failed, err: %v", err)
}
return nil
if params.Container != "" {
cmdArgs = append(cmdArgs, params.Container)
}
cmd := exec.Command("docker", cmdArgs...)

cmd := exec.Command(commandName, commandArg...)
stdout, err := cmd.StdoutPipe()
if err != nil {
_ = cmd.Process.Signal(syscall.SIGTERM)
return err
errorChan <- fmt.Errorf("failed to get stdout pipe: %v", err)
return
}
cmd.Stderr = cmd.Stdout
if err := cmd.Start(); err != nil {
_ = cmd.Process.Signal(syscall.SIGTERM)
return err
errorChan <- fmt.Errorf("failed to start command: %v", err)
return
}
exitCh := make(chan struct{})
go func() {
_, wsData, _ := wsConn.ReadMessage()
if string(wsData) == "close conn" {
_ = cmd.Process.Signal(syscall.SIGTERM)
exitCh <- struct{}{}
}
}()

go func() {
buffer := make([]byte, 1024)
for {
select {
case <-exitCh:
return
default:
n, err := stdout.Read(buffer)
if err != nil {
if err == io.EOF {
return
}
global.LOG.Errorf("read bytes from log failed, err: %v", err)
return
}
if !utf8.Valid(buffer[:n]) {
continue
}
if err = wsConn.WriteMessage(websocket.TextMessage, buffer[:n]); err != nil {
global.LOG.Errorf("send message with log to ws failed, err: %v", err)
return
}
}
scanner := bufio.NewScanner(stdout)
lineNumber := 0

for scanner.Scan() {
lineNumber++
message := scanner.Text()
select {
case messageChan <- message:
case <-time.After(time.Second):
errorChan <- fmt.Errorf("message channel blocked")
return
}
}()
_ = cmd.Wait()
return nil
}

if err := scanner.Err(); err != nil {
errorChan <- fmt.Errorf("scanner error: %v", err)
return
}
cmd.Wait()
}

func (u *ContainerService) DownloadContainerLogs(containerType, container, since, tail string, c *gin.Context) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code is missing some crucial parts such as imports statements and function definitions. Please provide these information so I can give you a proper analysis.

In summary:

  • There is a missing import statement at line 18.

To improve the code in general, I think it would be more beneficial if we could have all comments included under the section where they fit best to avoid confusion over different areas of the codebase needing comment updates later.

I suggest this refactoring:

// This file contains custom implementations for specific Docker services interfaces.
package docker

import (
    // Add any necessary package import here.
)

type ComposeService interface{}

func NewComposeService() ComposeService { return &composeService{} }

This approach will ensure clearer documentation of each service's methods and types across the file. To address the issue regarding the incomplete code, I added a simple placeholder that reflects no implementation was found.

Remember, always keep an eye on potential future additions while reviewing the changes.

Expand Down
17 changes: 16 additions & 1 deletion agent/init/migration/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ import (
)

func Init() {
InitAgentDB()
InitTaskDB()
global.LOG.Info("Migration run successfully")
}

func InitAgentDB() {
m := gormigrate.New(global.DB, gormigrate.DefaultOptions, []*gormigrate.Migration{
migrations.AddTable,
migrations.AddMonitorTable,
Expand All @@ -20,5 +26,14 @@ func Init() {
global.LOG.Error(err)
panic(err)
}
global.LOG.Info("Migration run successfully")
}

func InitTaskDB() {
m := gormigrate.New(global.TaskDB, gormigrate.DefaultOptions, []*gormigrate.Migration{
migrations.AddTaskTable,
})
if err := m.Migrate(); err != nil {
global.LOG.Error(err)
panic(err)
}
}
9 changes: 9 additions & 0 deletions agent/init/migration/migrations/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,12 @@ var InitPHPExtensions = &gormigrate.Migration{
return nil
},
}

var AddTaskTable = &gormigrate.Migration{
ID: "20241226-add-task",
Migrate: func(tx *gorm.DB) error {
return tx.AutoMigrate(
&model.Task{},
)
},
}
3 changes: 1 addition & 2 deletions agent/router/ro_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (s *ContainerRouter) InitRouter(Router *gin.RouterGroup) {
baRouter.POST("/list", baseApi.ListContainer)
baRouter.GET("/status", baseApi.LoadContainerStatus)
baRouter.GET("/list/stats", baseApi.ContainerListStats)
baRouter.GET("/search/log", baseApi.ContainerLogs)
baRouter.GET("/search/log", baseApi.ContainerStreamLogs)
baRouter.POST("/download/log", baseApi.DownloadContainerLogs)
baRouter.GET("/limit", baseApi.LoadResourceLimit)
baRouter.POST("/clean/log", baseApi.CleanContainerLog)
Expand All @@ -46,7 +46,6 @@ func (s *ContainerRouter) InitRouter(Router *gin.RouterGroup) {
baRouter.POST("/compose/test", baseApi.TestCompose)
baRouter.POST("/compose/operate", baseApi.OperatorCompose)
baRouter.POST("/compose/update", baseApi.ComposeUpdate)
baRouter.GET("/compose/search/log", baseApi.ComposeLogs)

baRouter.GET("/template", baseApi.ListComposeTemplate)
baRouter.POST("/template/search", baseApi.SearchComposeTemplate)
Expand Down
3 changes: 2 additions & 1 deletion frontend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@
"vue-codemirror": "^6.1.1",
"vue-demi": "^0.14.6",
"vue-i18n": "^9.13.1",
"vue-router": "^4.3.3"
"vue-router": "^4.3.3",
"vue-virtual-scroller": "^2.0.0-beta.8"
},
"devDependencies": {
"@types/node": "^20.14.8",
Expand Down
Loading
Loading