Skip to content

Commit

Permalink
blob compression + composition version
Browse files Browse the repository at this point in the history
  • Loading branch information
mandelsoft committed Oct 25, 2023
1 parent 09cb473 commit 17795ca
Show file tree
Hide file tree
Showing 18 changed files with 569 additions and 33 deletions.
188 changes: 188 additions & 0 deletions pkg/common/accessio/blobaccess/compress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// SPDX-FileCopyrightText: 2023 SAP SE or an SAP affiliate company and Open Component Model contributors.
//
// SPDX-License-Identifier: Apache-2.0

package blobaccess

import (
"bytes"
"compress/gzip"
"io"
"sync"

"github.com/opencontainers/go-digest"

"github.com/open-component-model/ocm/pkg/common/accessio/blobaccess/spi"
compression2 "github.com/open-component-model/ocm/pkg/common/compression"
"github.com/open-component-model/ocm/pkg/errors"
"github.com/open-component-model/ocm/pkg/mime"
)

////////////////////////////////////////////////////////////////////////////////

type compression struct {
blob BlobAccess
}

var _ spi.BlobAccessBase = (*compression)(nil)

func (c *compression) Close() error {
return c.blob.Close()
}

func (c *compression) Get() ([]byte, error) {
r, err := c.blob.Reader()
if err != nil {
return nil, err
}
defer r.Close()
rr, _, err := compression2.AutoDecompress(r)
if err != nil {
return nil, err
}
buf := bytes.NewBuffer(nil)

w := gzip.NewWriter(buf)
_, err = io.Copy(w, rr)
w.Close()
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}

type reader struct {
wait sync.WaitGroup
io.ReadCloser
err error
}

func (r *reader) Close() error {
err := r.ReadCloser.Close()
r.wait.Wait()
return errors.Join(err, r.err)
}

func (c *compression) Reader() (io.ReadCloser, error) {
r, err := c.blob.Reader()
if err != nil {
return nil, err
}
defer r.Close()
rr, _, err := compression2.AutoDecompress(r)
if err != nil {
return nil, err
}
pr, pw := io.Pipe()
cw := gzip.NewWriter(pw)

outr := &reader{ReadCloser: pr}
outr.wait.Add(1)

go func() {
_, err := io.Copy(cw, rr)
outr.err = errors.Join(err, cw.Close(), pw.Close())
outr.wait.Done()
}()
return outr, nil
}

func (c *compression) Digest() digest.Digest {
return BLOB_UNKNOWN_DIGEST
}

func (c *compression) MimeType() string {
m := c.blob.MimeType()
if mime.IsGZip(m) {
return m
}
return m + "+gzip"
}

func (c *compression) DigestKnown() bool {
return false
}

func (c *compression) Size() int64 {
return BLOB_UNKNOWN_SIZE
}

func WithCompression(blob BlobAccess) (BlobAccess, error) {
b, err := blob.Dup()
if err != nil {
return nil, err
}
return spi.NewBlobAccessForBase(&compression{
blob: b,
}), nil
}

////////////////////////////////////////////////////////////////////////////////

type decompression struct {
blob BlobAccess
}

var _ spi.BlobAccessBase = (*decompression)(nil)

func (c *decompression) Close() error {
return c.blob.Close()
}

func (c *decompression) Get() ([]byte, error) {
r, err := c.blob.Reader()
if err != nil {
return nil, err
}
defer r.Close()
rr, _, err := compression2.AutoDecompress(r)
if err != nil {
return nil, err
}
buf := bytes.NewBuffer(nil)
_, err = io.Copy(buf, rr)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}

func (c *decompression) Reader() (io.ReadCloser, error) {
r, err := c.blob.Reader()
if err != nil {
return nil, err
}
defer r.Close()
rr, _, err := compression2.AutoDecompress(r)
return rr, err
}

func (c *decompression) Digest() digest.Digest {
return BLOB_UNKNOWN_DIGEST
}

func (c *decompression) MimeType() string {
m := c.blob.MimeType()
if !mime.IsGZip(m) {
return m
}
return m[:len(m)-5]
}

func (c *decompression) DigestKnown() bool {
return false
}

func (c *decompression) Size() int64 {
return BLOB_UNKNOWN_SIZE
}

func WithDecompression(blob BlobAccess) (BlobAccess, error) {
b, err := blob.Dup()
if err != nil {
return nil, err
}
return spi.NewBlobAccessForBase(&decompression{
blob: b,
}), nil
}
85 changes: 85 additions & 0 deletions pkg/common/accessio/blobaccess/compress_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// SPDX-FileCopyrightText: 2023 SAP SE or an SAP affiliate company and Open Component Model contributors.
//
// SPDX-License-Identifier: Apache-2.0

package blobaccess_test

import (
"bytes"
"compress/gzip"
"io"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
. "github.com/open-component-model/ocm/pkg/testutils"

"github.com/open-component-model/ocm/pkg/common/accessio/blobaccess"
"github.com/open-component-model/ocm/pkg/mime"
)

var _ = Describe("temp file management", func() {

Context("compress", func() {
It("compress access", func() {
blob := blobaccess.ForString(mime.MIME_TEXT, "testdata")
defer blob.Close()

comp := Must(blobaccess.WithCompression(blob))
defer comp.Close()

Expect(comp.MimeType()).To(Equal(mime.MIME_TEXT + "+gzip"))
data := Must(comp.Get())
Expect(len(data)).To(Not(Equal(8)))

uncomp := Must(io.ReadAll(Must(gzip.NewReader(bytes.NewReader(data)))))
Expect(string(uncomp)).To(Equal("testdata"))
})

It("compress reader access", func() {
blob := blobaccess.ForString(mime.MIME_TEXT, "testdata")
defer blob.Close()

comp := Must(blobaccess.WithCompression(blob))
defer comp.Close()

r := Must(comp.Reader())
data := Must(io.ReadAll(r))
Expect(len(data)).To(Not(Equal(8)))

uncomp := Must(io.ReadAll(Must(gzip.NewReader(bytes.NewReader(data)))))
Expect(string(uncomp)).To(Equal("testdata"))
})
})

Context("uncompress", func() {
buf := bytes.NewBuffer(nil)
cw := gzip.NewWriter(buf)
MustBeSuccessful(io.WriteString(cw, "testdata"))
cw.Close()

It("uncompress access", func() {
blob := blobaccess.ForData(mime.MIME_TEXT+"+gzip", buf.Bytes())
defer blob.Close()

comp := Must(blobaccess.WithDecompression(blob))
defer comp.Close()
Expect(comp.MimeType()).To(Equal(mime.MIME_TEXT))

data := Must(comp.Get())
Expect(string(data)).To(Equal("testdata"))
})

It("compress reader access", func() {
blob := blobaccess.ForData(mime.MIME_TEXT+"+gzip", buf.Bytes())
defer blob.Close()

comp := Must(blobaccess.WithDecompression(blob))
defer comp.Close()

r := Must(comp.Reader())
data := Must(io.ReadAll(r))
Expect(string(data)).To(Equal("testdata"))
})
})

})
4 changes: 4 additions & 0 deletions pkg/common/accessio/blobaccess/spi/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ func (s *staticBlobAccess) Dup() (BlobAccess, error) {
return s, nil
}

func (s *staticBlobAccess) Close() error {
return nil
}

// ForStaticDataAccess is used for a data access using no closer.
// They don't require a finalization and can be used
// as long as they exist. Therefore, no ref counting
Expand Down
7 changes: 5 additions & 2 deletions pkg/common/accessio/blobaccess/spi/view.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ func (b *blobAccessView) base() BlobAccessBase {
return b.access
}

func (b *blobAccessView) Close() error {
return b.View.Close()
}

func (b *blobAccessView) Validate() error {
return utils.ValidateObject(b.access)
}
Expand All @@ -41,9 +45,8 @@ func (b *blobAccessView) Get() (result []byte, err error) {
return result, b.Execute(func() error {
result, err = b.access.Get()
if err != nil {
return fmt.Errorf("unable to get access: %w", err)
return err
}

return nil
})
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/common/accessio/blobaccess/standard.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ func (b *blobprovider) Close() error {
// as long as the given blob access is not closed.
// If required the blob can be closed with the additionally
// provided Close method.
// ATTENTION: the underlying BlobAccess wil not be closed
// as long as the provider is not closed, but the BlobProvider
// interface is no io.Closer.
// To be on the safe side, this method should only be called
// with static blob access, featuring a NOP closer without
// anny attached external resources, which should be released.
func ProviderForBlobAccess(blob BlobAccess) *blobprovider {
return &blobprovider{blob}
}
Expand Down
23 changes: 23 additions & 0 deletions pkg/common/accessio/refmgmt/refcloser.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,23 @@ type LazyMode interface {
Lazy()
}

// ToLazy resets the main view flag
// of closer views to enable
// dark release of resources even if the
// first/main view has been closed.
// Otherwise, closing the main view will
// fail, if there are still subsequent views.
func ToLazy[T any](o T, err error) (T, error) {
if err == nil {
Lazy(o)
}
return o, err
}

func Lazy(o interface{}) bool {
if o == nil {
return false
}
if l, ok := o.(LazyMode); ok {
l.Lazy()
return true
Expand Down Expand Up @@ -125,6 +141,9 @@ func (v *view) Execute(f func() error) error {
return f()
}

// Release will release the view.
// With releasing the last view
// the underlying object will be closed.
func (v *view) Release() error {
v.lock.Lock()
defer v.lock.Unlock()
Expand All @@ -135,6 +154,10 @@ func (v *view) Release() error {
return v.ref.Unref()
}

// Finalize will try to finalize the
// underlying object. This is only
// possible if no further view is
// still pending.
func (v *view) Finalize() error {
v.lock.Lock()
defer v.lock.Unlock()
Expand Down
Loading

0 comments on commit 17795ca

Please sign in to comment.