Skip to content
This repository has been archived by the owner on Aug 29, 2023. It is now read-only.

Refactors out the use of GCS so a different object store can be used #51

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 6 additions & 84 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package api

import (
"bytes"
"context"
"encoding/base64"
"encoding/gob"
"encoding/json"
Expand All @@ -32,16 +31,11 @@ import (
"net/url"
"strconv"
"strings"
"sync"

"cloud.google.com/go/storage"
"github.com/googlegenomics/htsget/internal/analytics"
"github.com/googlegenomics/htsget/internal/bam"
"github.com/googlegenomics/htsget/internal/bgzf"
"github.com/googlegenomics/htsget/internal/genomics"
"golang.org/x/oauth2"
"google.golang.org/api/googleapi"
"google.golang.org/api/option"
)

const (
Expand All @@ -62,7 +56,7 @@ var (
// storage.Client to satisfy the incoming request. Any headers that caused this
// particular client to be created are returned to allow block requests to be
// generated correctly.
type NewStorageClientFunc func(*http.Request) (*storage.Client, http.Header, error)
type NewStorageClientFunc func(*http.Request) (Client, http.Header, error)

// Server provides an htsget protocol server. Must be created with NewServer.
type Server struct {
Expand Down Expand Up @@ -124,7 +118,7 @@ func (server *Server) serveReads(w http.ResponseWriter, req *http.Request) {
return
}

data, err := gcs.Bucket(bucket).Object(object).NewRangeReader(ctx, 0, int64(server.blockSizeLimit))
data, err := gcs.NewObjectHandle(bucket, object).NewRangeReader(ctx, 0, int64(server.blockSizeLimit))
if err != nil {
writeError(w, newStorageError("opening data", err))
return
Expand All @@ -143,8 +137,9 @@ func (server *Server) serveReads(w http.ResponseWriter, req *http.Request) {
}

request := &readsRequest{
indexObjects: []*storage.ObjectHandle{gcs.Bucket(bucket).Object(object + ".bai"),
gcs.Bucket(bucket).Object(strings.TrimSuffix(object, ".bam") + ".bai"),
indexObjects: []ObjectHandle{
gcs.NewObjectHandle(bucket, object+".bai"),
gcs.NewObjectHandle(bucket, strings.TrimSuffix(object, ".bam")+".bai"),
},
blockSizeLimit: server.blockSizeLimit,
region: region,
Expand Down Expand Up @@ -228,7 +223,7 @@ func (server *Server) serveBlocks(w http.ResponseWriter, req *http.Request) {
}

request := &blockRequest{
object: gcs.Bucket(bucket).Object(object),
object: gcs.NewObjectHandle(bucket, object),
chunk: chunk,
}

Expand Down Expand Up @@ -362,24 +357,6 @@ func newNotFoundError(context string, err error) error {
return newApiError("NotFound", http.StatusNotFound, context, err)
}

func newStorageError(context string, err error) error {
if err == errMissingOrInvalidToken {
return newPermissionDeniedError(context, err)
}
if err == storage.ErrObjectNotExist {
return newNotFoundError("object does not exist", err)
}
if err, ok := err.(*googleapi.Error); ok {
switch err.Code {
case http.StatusUnauthorized:
return newInvalidAuthenticationError(context, err)
case http.StatusForbidden:
return newPermissionDeniedError(context, err)
}
}
return err
}

// writeError writes either a JSON object or bare HTTP error describing err to
// w. A JSON object is written only when the error has a name and code defined
// by the htsget specification.
Expand All @@ -405,61 +382,6 @@ func writeJSON(w http.ResponseWriter, code int, v interface{}) {
json.NewEncoder(w).Encode(v)
}

var (
defaultStorageClient *storage.Client
initializeDefaultStorageClient sync.Once
)

func newClientWithOptions(opts ...option.ClientOption) (*storage.Client, http.Header, error) {
initializeDefaultStorageClient.Do(func() {
gcs, err := storage.NewClient(context.Background(), opts...)
if err != nil {
log.Fatalf("Creating default storage client: %v", err)
}
defaultStorageClient = gcs
})
return defaultStorageClient, nil, nil
}

// NewDefaultClient returns a storage client that uses the application default
// credentials. It caches the storage client for efficiency.
func NewDefaultClient(_ *http.Request) (*storage.Client, http.Header, error) {
return newClientWithOptions()
}

// NewPublicClient returns a storage client that does not use any form of
// client authorization. It can only be used to read publicly-readable
// objects. It caches the storage client for efficiency.
func NewPublicClient(_ *http.Request) (*storage.Client, http.Header, error) {
return newClientWithOptions(option.WithHTTPClient(http.DefaultClient))
}

// NewClientFromBearerToken constructs a storage client that uses the OAuth2
// bearer token found in req to make storage requests. It returns the
// authorization header containing the bearer token as well to allow subsequent
// requests to be authenticated correctly.
func NewClientFromBearerToken(req *http.Request) (*storage.Client, http.Header, error) {
authorization := req.Header.Get("Authorization")

fields := strings.Split(authorization, " ")
if len(fields) != 2 || fields[0] != "Bearer" {
return nil, nil, errMissingOrInvalidToken
}

token := oauth2.Token{
TokenType: fields[0],
AccessToken: fields[1],
}
client, err := storage.NewClient(req.Context(), option.WithTokenSource(oauth2.StaticTokenSource(&token)))
if err != nil {
return nil, nil, fmt.Errorf("creating client with token source: %v", err)
}

return client, map[string][]string{
"Authorization": []string{authorization},
}, nil
}

type forwardOrigin func(w http.ResponseWriter, req *http.Request)

func (f forwardOrigin) ServeHTTP(w http.ResponseWriter, req *http.Request) {
Expand Down
4 changes: 2 additions & 2 deletions api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,8 @@ func testQuery(ctx context.Context, t *testing.T, url string) *http.Response {
if err != nil {
t.Fatalf("Failed to create storage client: %v", err)
}
newStorageClient := func(*http.Request) (*storage.Client, http.Header, error) {
return gcs, nil, nil
newStorageClient := func(*http.Request) (Client, http.Header, error) {
return GCSClient{gcs}, nil, nil
}

mux := http.NewServeMux()
Expand Down
3 changes: 1 addition & 2 deletions api/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ import (
"io"
"io/ioutil"

"cloud.google.com/go/storage"
"github.com/googlegenomics/htsget/internal/bgzf"
)

type blockRequest struct {
object *storage.ObjectHandle
object ObjectHandle
chunk bgzf.Chunk
}

Expand Down
21 changes: 21 additions & 0 deletions api/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package api

import (
"context"
"io"
)

// Client is an interface to the storage engine.
type Client interface {
// NewObjectHandle returns a handle to a specified object in
// the storage engine.
NewObjectHandle(bucket, object string) ObjectHandle
}

// ObjectHandle is an interface to the actual storage engine in use.
type ObjectHandle interface {
// NewRangeReader returns a reader that reads from a specified
// range. Length of -1 means to capture everything until the
// end.
NewRangeReader(ctx context.Context, offset, length int64) (io.ReadCloser, error)
}
108 changes: 108 additions & 0 deletions api/gcs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package api

import (
"context"
"fmt"
"io"
"log"
"net/http"
"strings"
"sync"

"cloud.google.com/go/storage"
"golang.org/x/oauth2"
"google.golang.org/api/googleapi"
"google.golang.org/api/option"
)

// GCSClient is Client for accessing Google Cloud Storage.
type GCSClient struct {
*storage.Client
}

// NewObjectHandle returns a handle to a specified object in the
// storage engine.
func (c GCSClient) NewObjectHandle(bucket, object string) ObjectHandle {
return gcsObjectHandle{c.Bucket(bucket).Object(object)}
}

type gcsObjectHandle struct {
*storage.ObjectHandle
}

func (h gcsObjectHandle) NewRangeReader(ctx context.Context, offset, length int64) (io.ReadCloser, error) {
return h.ObjectHandle.NewRangeReader(ctx, offset, length)
}

var (
defaultStorageClient *storage.Client
initializeDefaultStorageClient sync.Once
)

func newClientWithOptions(opts ...option.ClientOption) (Client, http.Header, error) {
initializeDefaultStorageClient.Do(func() {
gcs, err := storage.NewClient(context.Background(), opts...)
if err != nil {
log.Fatalf("Creating default storage client: %v", err)
}
defaultStorageClient = gcs
})
return GCSClient{defaultStorageClient}, nil, nil
}

// NewDefaultClient returns a storage client that uses the application default
// credentials. It caches the storage client for efficiency.
func NewDefaultClient(_ *http.Request) (Client, http.Header, error) {
return newClientWithOptions()
}

// NewPublicClient returns a storage client that does not use any form of
// client authorization. It can only be used to read publicly-readable
// objects. It caches the storage client for efficiency.
func NewPublicClient(_ *http.Request) (Client, http.Header, error) {
return newClientWithOptions(option.WithHTTPClient(http.DefaultClient))
}

// NewClientFromBearerToken constructs a storage client that uses the OAuth2
// bearer token found in req to make storage requests. It returns the
// authorization header containing the bearer token as well to allow subsequent
// requests to be authenticated correctly.
func NewClientFromBearerToken(req *http.Request) (Client, http.Header, error) {
authorization := req.Header.Get("Authorization")

fields := strings.Split(authorization, " ")
if len(fields) != 2 || fields[0] != "Bearer" {
return nil, nil, errMissingOrInvalidToken
}

token := oauth2.Token{
TokenType: fields[0],
AccessToken: fields[1],
}
client, err := storage.NewClient(req.Context(), option.WithTokenSource(oauth2.StaticTokenSource(&token)))
if err != nil {
return nil, nil, fmt.Errorf("creating client with token source: %v", err)
}

return GCSClient{client}, map[string][]string{
"Authorization": []string{authorization},
}, nil
}

func newStorageError(context string, err error) error {
if err == errMissingOrInvalidToken {
return newPermissionDeniedError(context, err)
}
if err == storage.ErrObjectNotExist {
return newNotFoundError("object does not exist", err)
}
if err, ok := err.(*googleapi.Error); ok {
switch err.Code {
case http.StatusUnauthorized:
return newInvalidAuthenticationError(context, err)
case http.StatusForbidden:
return newPermissionDeniedError(context, err)
}
}
return err
}
8 changes: 4 additions & 4 deletions api/reads.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,24 @@ package api
import (
"context"
"fmt"
"io"

"cloud.google.com/go/storage"
"github.com/googlegenomics/htsget/internal/bam"
"github.com/googlegenomics/htsget/internal/bgzf"
"github.com/googlegenomics/htsget/internal/genomics"
)

type readsRequest struct {
indexObjects []*storage.ObjectHandle
indexObjects []ObjectHandle
blockSizeLimit uint64
region genomics.Region
}

func (req *readsRequest) handle(ctx context.Context) ([]*bgzf.Chunk, error) {
var index *storage.Reader
var index io.ReadCloser
var err error
for _, object := range req.indexObjects {
index, err = object.NewReader(ctx)
index, err = object.NewRangeReader(ctx, 0, -1)
if err == nil {
break
}
Expand Down
3 changes: 1 addition & 2 deletions appengine/htsget.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"os"
"strings"

"cloud.google.com/go/storage"
"github.com/googlegenomics/htsget/api"
"google.golang.org/appengine"
)
Expand All @@ -20,6 +19,6 @@ func init() {
http.HandleFunc("/", mux.ServeHTTP)
}

func newAppEngineClient(req *http.Request) (*storage.Client, http.Header, error) {
func newAppEngineClient(req *http.Request) (api.Client, http.Header, error) {
return api.NewClientFromBearerToken(req.WithContext(appengine.NewContext(req)))
}