Skip to content

Commit

Permalink
feature/create-vwap-service
Browse files Browse the repository at this point in the history
  • Loading branch information
trsmarc committed Sep 21, 2021
1 parent d75b519 commit 85cae55
Show file tree
Hide file tree
Showing 21 changed files with 534 additions and 1 deletion.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.DS_Store
9 changes: 9 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
FROM golang:1.17-alpine
ENV TIMEZONE Asia/Bangkok

WORKDIR /vwap-engine
COPY . .

RUN apk add build-base
RUN go mod download
RUN go mod vendor
17 changes: 17 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!sh
APP_NAME:=vwap-engine

run:
go run .

tests:
go test -race -v ./...

docker-build:
- docker build -t ${APP_NAME} .

docker-run: docker-build
- docker run -it ${APP_NAME} go run .

docker-test: docker-build
- docker run -it ${APP_NAME} go test -race -v ./...
70 changes: 69 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,69 @@
# vwap-calculation-engine
# Volume-weighted average price (VWAMP) calculation engine

- [About the project](#about-the-project)
- [Design](#design)
- [Getting started](#getting-started)
- [Layout](#layout)
- [Notes](#notes)


## About the project
The goal of this project is to create a real-time VWAP (volume-weighted average price) calculation engine pull coinbase websocket feed to stream in trade executions and update the VWAP for each trading pair
as updates become available.

### Design

This project follows the [Clean architecture](https://blog.cleancoder.com/uncle-bob/2012/08/13/the-clean-architecture.html) convention.

## Getting started

### Prerequisites
- installed [Golang 1.17](https://golang.org/)
- or run using [Docker](https://www.docker.com/)

### Start application
Run using Go
```sh
make run // to start application
make tests // to run all the tests
```
Run using Docker
```sh
make docker-build
make docker-run
make docker-test
```


### Layout

```tree
├── .gitignore
├── CHANGELOG.md
├── Makefile
├── README.md
├── service
│   └── vamp.service.go
├── release
│   ├── template-admin.yaml
│   └── template-controller.yaml
├── test
│   ├── README.md
│   └── test_make.sh
└── external
   ├── notifier
   └── provider
```

A brief description of the layout:

* `.gitignore` varies per project, but all projects need to ignore `bin` directory.
* `CHANGELOG.md` contains auto-generated changelog information.
* `README.md` is a detailed description of the project.
* `pkg` places most of project business logic.
* `test` holds all tests.
* `external` for all external services i.e. feed data provider and notifier.

## Notes

* Makefile **MUST NOT** change well-defined command semantics, see Makefile for details.
9 changes: 9 additions & 0 deletions config/app.config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package config

type AppConfig struct {
MaxWindowSize int
}

var App = AppConfig{
MaxWindowSize: 200,
}
17 changes: 17 additions & 0 deletions config/coinbase.config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package config

type CoinbaseConfig struct {
WebsocketEndpoint string
MatchesChannelName string
MessageSubscribeType string
MessageSubscriptionsType string
MessageErrorType string
}

var Coinbase = CoinbaseConfig{
WebsocketEndpoint: "wss://ws-feed.pro.coinbase.com",
MatchesChannelName: "matches",
MessageSubscribeType: "subscribe",
MessageSubscriptionsType: "subscriptions",
MessageErrorType: "error",
}
16 changes: 16 additions & 0 deletions domain/pair.domain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package domain

import "fmt"

type Pair struct {
From string
To string
}

func NewPair(from string, to string) Pair {
return Pair{From: from, To: to}
}

func (t Pair) String() string {
return fmt.Sprintf("%s-%s", t.From, t.To)
}
23 changes: 23 additions & 0 deletions domain/queue.domain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package domain

type Queue struct {
tradings []Trading
}

func NewQueue() *Queue {
return &Queue{tradings: make([]Trading, 0)}
}

func (t *Queue) Add(trading Trading) {
t.tradings = append(t.tradings, trading)
}

func (t *Queue) Remove() Trading {
var first = t.tradings[0]
t.tradings = t.tradings[1:]
return first
}

func (t *Queue) Len() int {
return len(t.tradings)
}
30 changes: 30 additions & 0 deletions domain/trading.domain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package domain

import (
"strconv"
"time"

"github.com/pkg/errors"
)

type Trading struct {
ID int
Pair string
Share float64
Price float64
CreatedAt time.Time
}

func NewTrading(tradeID int, pair string, size string, price string, createdAt time.Time) (Trading, error) {
shareNumber, err := strconv.ParseFloat(size, 64)
if err != nil {
return Trading{}, errors.Wrap(err, "unable to convert size to float64")
}

priceNumber, err := strconv.ParseFloat(price, 64)
if err != nil {
return Trading{}, errors.Wrap(err, "unable to convert price to float64")
}

return Trading{ID: tradeID, Pair: pair, Share: shareNumber, Price: priceNumber, CreatedAt: createdAt}, nil
}
18 changes: 18 additions & 0 deletions external/notifiers/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package notifiers

import (
"log"

"github.com/marktrs/vwap-calculation-engine/domain"
)

type Logger struct{}

func NewLogger() *Logger {
return &Logger{}
}

func (p Logger) Stream(trading domain.Trading, f float64) error {
log.Printf("[%s] vwap:%f\n", trading.Pair, f)
return nil
}
86 changes: 86 additions & 0 deletions external/providers/coinbase.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package providers

import (
"fmt"

"github.com/gorilla/websocket"
"github.com/marktrs/vwap-calculation-engine/config"
"github.com/marktrs/vwap-calculation-engine/domain"
"github.com/pkg/errors"
)

type Coinbase struct {
conn *websocket.Conn
conf config.CoinbaseConfig
}

func NewCoinbase(conf config.CoinbaseConfig) (*Coinbase, error) {
var wsDialer websocket.Dialer

conn, _, err := wsDialer.Dial(conf.WebsocketEndpoint, nil)
if err != nil {
return nil, errors.Wrap(err, "unable to establish Coinbase Websocket connection")
}

return &Coinbase{
conn,
conf,
}, nil
}

func (c Coinbase) Subscribe(pairs []domain.Pair) error {
var products []string
for _, v := range pairs {
products = append(products, v.String())
}

req := coinbaseRequestMessage{
Type: c.conf.MessageSubscribeType,
Channels: []coinbaseChannel{
{
Name: c.conf.MatchesChannelName,
Products: products,
},
},
}

if err := c.conn.WriteJSON(req); err != nil {
return errors.Wrap(err, fmt.Sprintf("unable to subscribe with %v pairs", products))
}

return nil
}

func (c Coinbase) Pull(ch chan domain.Trading) error {
for {
message := coinbaseResponseMessage{}
if err := c.conn.ReadJSON(&message); err != nil {
close(ch)
return errors.Wrap(err, "unable to parse message")
}

if message.Type == c.conf.MessageErrorType {
close(ch)
return fmt.Errorf("coinbase error: %v", message)
}

if message.Type == c.conf.MessageSubscriptionsType {
continue
}

trading, err := domain.NewTrading(
message.TradeID,
message.ProductID,
message.Size,
message.Price,
message.Time,
)

if err != nil {
close(ch)
return errors.Wrap(err, "unable to parse trading data")
}

ch <- trading
}
}
23 changes: 23 additions & 0 deletions external/providers/model.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package providers

import "time"

type coinbaseChannel struct {
Name string `json:"name"`
Products []string `json:"product_ids"`
}

type coinbaseRequestMessage struct {
Type string `json:"type"`
Channels []coinbaseChannel `json:"channels"`
}

type coinbaseResponseMessage struct {
Type string `json:"type"`
TradeID int `json:"trade_id"`
ProductID string `json:"product_id"`
Size string `json:"size"`
Price string `json:"price"`
Side string `json:"side"`
Time time.Time `json:"time"`
}
8 changes: 8 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module github.com/marktrs/vwap-calculation-engine

go 1.17

require (
github.com/gorilla/websocket v1.4.2
github.com/pkg/errors v0.9.1
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
30 changes: 30 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package main

import (
"log"

"github.com/marktrs/vwap-calculation-engine/config"
"github.com/marktrs/vwap-calculation-engine/domain"
"github.com/marktrs/vwap-calculation-engine/external/notifiers"
"github.com/marktrs/vwap-calculation-engine/external/providers"
"github.com/marktrs/vwap-calculation-engine/service"
)

func main() {
provider, err := providers.NewCoinbase(config.Coinbase)
if err != nil {
log.Fatalln(err)
}

if err = provider.Subscribe([]domain.Pair{
domain.NewPair("BTC", "USD"),
domain.NewPair("ETH", "USD"),
domain.NewPair("ETH", "BTC"),
}); err != nil {
log.Fatalln(err)
}

logger := notifiers.NewLogger()
vwapService := service.NewVWAPService(provider, logger, config.App)
vwapService.Calculate()
}
9 changes: 9 additions & 0 deletions repository/notifier.repo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package repository

import (
"github.com/marktrs/vwap-calculation-engine/domain"
)

type Notifier interface {
Stream(domain.Trading, float64) error
}
10 changes: 10 additions & 0 deletions repository/provider.repo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package repository

import (
"github.com/marktrs/vwap-calculation-engine/domain"
)

type Provider interface {
Pull(chan domain.Trading) error
Subscribe([]domain.Pair) error
}
Loading

0 comments on commit 85cae55

Please sign in to comment.