Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

job logs microservice #30

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/go-chi/render v1.0.1
github.com/go-git/go-git/v5 v5.2.0
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.1.2
github.com/gorilla/mux v1.8.0 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OI
github.com/google/pprof v0.0.0-20191218002539-d4f498aebedc/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
Expand Down
55 changes: 55 additions & 0 deletions pkg/models/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package models
import (
"errors"
"github.com/go-git/go-git/v5/plumbing"
log "github.com/sirupsen/logrus"
"net/http"
"time"
)
Expand Down Expand Up @@ -69,3 +70,57 @@ func (u *User) Bind(*http.Request) error {

return nil
}

type Job struct {
PackageName string
Status BuildStatus
Logs BuildLog `json:",omitempty"`
Uuid string
Time time.Time
}

// logsToKeep are the number of log lines to keep when sending a job.
const logsToKeep = 10

func (j Job) Render(w http.ResponseWriter, r *http.Request) error {
// Remove everything but the last 10 log lines. To get all
// logs the /job/{uuid}/logs route can be used. This is because
// the logs can get quite large, and if you want information about a single
// job it's not really useful to get all the logs. This is especially true
// when retrieving *all* jobs. In that case you really don't want all logs to
// be sent over as well
if len(j.Logs) > logsToKeep {
j.Logs = j.Logs[len(j.Logs)-logsToKeep:]
}

return nil
}

type BuildStatus int

const (
BuildStatusPending BuildStatus = iota
BuildStatusPullingRepo
BuildStatusRunning
BuildStatusUploading
BuildStatusDone

BuildStatusErrored
)

type LogLine struct {
Time time.Time
Level log.Level
message string
}

func (j LogLine) Bind(r *http.Request) error {
return nil
}

type BuildLog []LogLine

func (j LogLine) Render(w http.ResponseWriter, r *http.Request) error {

return nil
}
217 changes: 217 additions & 0 deletions pkg/store/jobstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
package store

import (
"bytes"
"encoding/gob"
"github.com/dgraph-io/badger/v2"
"github.com/finitum/AAAAA/pkg/models"
"github.com/google/uuid"
"github.com/pkg/errors"
"sync"
"time"
)

type JobStoreWrapper struct {
*Badger
jdonszelmann marked this conversation as resolved.
Show resolved Hide resolved

sync.Mutex
callbacks map[string][]func(line *models.LogLine)
}

func NewJobStore(badger *Badger) JobStore {
return &JobStoreWrapper{
Badger: badger,
callbacks: make(map[string][]func(line *models.LogLine)),
}
}

func (b *JobStoreWrapper) NewJob(name string) (*models.Job, error) {
jid, err := uuid.NewUUID()
if err != nil {
return nil, errors.Wrap(err, "uuid")
}

job := models.Job{
PackageName: name,
Status: models.BuildStatusPending,
Logs: nil,
Uuid: jid.String(),
Time: time.Now(),
}

err = b.db.Update(func(txn *badger.Txn) error {
var value bytes.Buffer

enc := gob.NewEncoder(&value)
err := enc.Encode(job)
if err != nil {
return errors.Wrap(err, "gob encode")
}

entry := badger.NewEntry([]byte(jobPrefix+jid.String()), value.Bytes()).WithTTL(jobTTL)
return errors.Wrap(txn.SetEntry(entry), "badger transaction")
})

return &job, err
}

func (b *JobStoreWrapper) AppendToJobLog(jid string, l *models.LogLine) error {
for _, cb := range b.callbacks[jid] {
cb(l)
}

return b.db.Update(func(txn *badger.Txn) error {
var job models.Job

// Get the job
item, err := txn.Get([]byte(jobPrefix + jid))
if err == badger.ErrKeyNotFound {
return ErrNotExists
} else if err != nil {
return errors.Wrap(err, "badger get")
}
err = item.Value(func(val []byte) error {
buf := bytes.NewBuffer(val)

dec := gob.NewDecoder(buf)
return errors.Wrap(dec.Decode(&job), "gob decode")
})
if err != nil {
return err
}

// Update the job
job.Logs = append(job.Logs, *l)

// Put the job back
var value bytes.Buffer
enc := gob.NewEncoder(&value)
err = enc.Encode(job)
if err != nil {
return errors.Wrap(err, "gob encode")
}

entry := badger.NewEntry([]byte(jobPrefix+jid), value.Bytes()).WithTTL(jobTTL)
return errors.Wrap(txn.SetEntry(entry), "badger transaction")
})
}

func (b *JobStoreWrapper) SetJobStatus(jid string, status models.BuildStatus) error {
return b.db.Update(func(txn *badger.Txn) error {
var job models.Job

// Get the job
item, err := txn.Get([]byte(jobPrefix + jid))
if err == badger.ErrKeyNotFound {
return ErrNotExists
} else if err != nil {
return errors.Wrap(err, "badger get")
}
err = item.Value(func(val []byte) error {
buf := bytes.NewBuffer(val)

dec := gob.NewDecoder(buf)
return errors.Wrap(dec.Decode(&job), "gob decode")
})
if err != nil {
return err
}

// Update the job
job.Status = status

// Put the job back
var value bytes.Buffer
enc := gob.NewEncoder(&value)
err = enc.Encode(job)
if err != nil {
return errors.Wrap(err, "gob encode")
}

entry := badger.NewEntry([]byte(jobPrefix+jid), value.Bytes()).WithTTL(jobTTL)
return errors.Wrap(txn.SetEntry(entry), "badger transaction")
})
}

func (b *JobStoreWrapper) GetLogs(jid string) (logs []models.LogLine, _ error) {
return logs, b.db.View(func(txn *badger.Txn) error {
var job models.Job

// Get the job
item, err := txn.Get([]byte(jobPrefix + jid))
if err == badger.ErrKeyNotFound {
return ErrNotExists
} else if err != nil {
return errors.Wrap(err, "badger get")
}
err = item.Value(func(val []byte) error {
buf := bytes.NewBuffer(val)

dec := gob.NewDecoder(buf)
return errors.Wrap(dec.Decode(&job), "gob decode")
})
if err != nil {
return err
}

logs = job.Logs

return nil
})
}

func (b *JobStoreWrapper) GetJobs() (jobs []models.Job, _ error) {
return jobs, b.db.View(func(txn *badger.Txn) error {
it := txn.NewIterator(badger.DefaultIteratorOptions)
defer it.Close()
prefix := []byte(jobPrefix)
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
item := it.Item()
var job models.Job
err := item.Value(func(val []byte) error {
buf := bytes.NewBuffer(val)

dec := gob.NewDecoder(buf)
return errors.Wrap(dec.Decode(&job), "gob decode")
})
jobs = append(jobs, job)
if err != nil {
return errors.Wrap(err, "badger iteration")
}
}
return nil
})
}

func (b *JobStoreWrapper) AddLogListener(uuid string, cb func(line *models.LogLine)) {
b.Lock()
defer b.Unlock()

b.callbacks[uuid] = append(b.callbacks[uuid], cb)
}

func (b *JobStoreWrapper) GetJob(jid string) (*models.Job, error) {
var job models.Job

return &job, b.db.View(func(txn *badger.Txn) error {

// Get the job
item, err := txn.Get([]byte(jobPrefix + jid))
if err == badger.ErrKeyNotFound {
return ErrNotExists
} else if err != nil {
return errors.Wrap(err, "badger get")
}
err = item.Value(func(val []byte) error {
buf := bytes.NewBuffer(val)

dec := gob.NewDecoder(buf)
return errors.Wrap(dec.Decode(&job), "gob decode")
})
if err != nil {
return err
}

return nil
})
}
32 changes: 32 additions & 0 deletions pkg/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Store interface {
}

const pkgPrefix = "pkg_"

type PackageStore interface {
// GetPackage gets a package definition from the store MUST return ErrNotExists if the package does not exist
GetPackage(name string) (*models.Pkg, error)
Expand All @@ -32,6 +33,7 @@ type PackageStore interface {
}

const userPrefix = "user_"

type UserStore interface {
// GetUser gets a user from the store MUST return ErrNotExists if the user does not exist
GetUser(name string) (*models.User, error)
Expand Down Expand Up @@ -88,3 +90,33 @@ func GetPartialCacheEntry(cache Cache, term string) (aur.Results, bool, error) {

return nil, false, ErrNotExists
}

const jobPrefix = "job_"

// Keep job logs for 10 days
const jobTTL = 10 * 24 * time.Hour

type JobStore interface {
// NewJob creates a new job. It returns the newly created job, with in it the
// uuid of the job which can be used for further lookup.
NewJob(name string) (*models.Job, error)

// AppendToJobLog appends a line to a job's log
AppendToJobLog(uuid string, l *models.LogLine) error

// SetJobStatus updates the status of this job
SetJobStatus(uuid string, status models.BuildStatus) error

// GetLogs returns the entire log of this job
GetLogs(uuid string) ([]models.LogLine, error)

// GetJobs returns all jobs
GetJobs() ([]models.Job, error)

// AddLogListener takes a function which will be called every time a new logline is
// added the job targeted with the uuid
AddLogListener(uuid string, cb func(line *models.LogLine))

// GetJob gets a job by uuid
GetJob(uuid string) (*models.Job, error)
}
Loading