Skip to content

Commit

Permalink
feat: implement SynchronizerAppCreator for managing application synch…
Browse files Browse the repository at this point in the history
…ronization and add related tests
  • Loading branch information
sandhilt committed Jan 31, 2025
1 parent 6950a26 commit 4f93a1a
Show file tree
Hide file tree
Showing 9 changed files with 339 additions and 11 deletions.
3 changes: 3 additions & 0 deletions pkg/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,16 @@ func NewSupervisorGraphQL(opts BootstrapOpts) supervisor.SupervisorWorker {
inputAbiDecoder,
)

synchronizerAppCreate := synchronizernode.NewSynchronizerAppCreator(container.GetApplicationRepository(), rawRepository)

synchronizerWorker := synchronizernode.NewSynchronizerCreateWorker(
container.GetInputRepository(),
container.GetRawInputRepository(),
dbRawUrl,
rawRepository,
&synchronizerUpdate,
container.GetOutputDecoder(),
synchronizerAppCreate,
synchronizerReport,
synchronizerOutputUpdate,
container.GetRawOutputRefRepository(),
Expand Down
3 changes: 2 additions & 1 deletion pkg/convenience/model/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package model

import (
"context"
"database/sql"
"math/big"
"time"

Expand Down Expand Up @@ -67,7 +68,7 @@ type ConvenienceApplication struct {
TemplateURI string `db:"template_uri"`
EpochLength uint64 `db:"epoch_length"`
State string `db:"state"`
Reason string `db:"reason,omitempty"`
Reason sql.NullString `db:"reason,omitempty"`
LastProcessedBlock uint64 `db:"last_processed_block"`
LastClaimCheckBlock uint64 `db:"last_claim_check_block"`
LastOutputCheckBlock uint64 `db:"last_output_check_block"`
Expand Down
14 changes: 7 additions & 7 deletions pkg/convenience/repository/input_raw_references_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (r *RawInputRefRepository) Create(ctx context.Context, rawInput RawInputRef
}

_, err = exec.ExecContext(ctx, `INSERT INTO convenience_input_raw_references (
id, app_id, input_index, app_contract, status, chain_id, created_at)
id, app_id, input_index, app_contract, status, chain_id, created_at)
VALUES ($1, $2, $3, $4, $5, $6, $7)`,
rawInput.ID, rawInput.AppID, rawInput.InputIndex,
rawInput.AppContract, rawInput.Status, rawInput.ChainID,
Expand All @@ -103,9 +103,9 @@ func (r *RawInputRefRepository) Create(ctx context.Context, rawInput RawInputRef
func (r *RawInputRefRepository) GetLatestInputRef(ctx context.Context) (*RawInputRef, error) {
var inputRef RawInputRef
err := r.Db.GetContext(ctx, &inputRef, `
SELECT * FROM convenience_input_raw_references
ORDER BY
created_at DESC, input_index DESC, app_id DESC
SELECT * FROM convenience_input_raw_references
ORDER BY
created_at DESC, input_index DESC, app_id DESC
LIMIT 1
`)

Expand All @@ -114,8 +114,8 @@ func (r *RawInputRefRepository) GetLatestInputRef(ctx context.Context) (*RawInpu
slog.Warn("No raw input references found")
return nil, nil
}
slog.Error("Failed to get latest raw input ref", "err", err)
return nil, err
slog.Error("Failed to get latest raw input ref", "err", err)

Check failure on line 117 in pkg/convenience/repository/input_raw_references_repository.go

View workflow job for this annotation

GitHub Actions / Build and test

File is not properly formatted (gofmt)
return nil, err
}

slog.Debug("Latest InputRef fetched",
Expand Down Expand Up @@ -157,7 +157,7 @@ func (r *RawInputRefRepository) FindFirstInputByStatusNone(ctx context.Context)
func (r *RawInputRefRepository) FindByInputIndexAndAppContract(ctx context.Context, inputIndex uint64, appContract *common.Address) (*RawInputRef, error) {
var inputRef RawInputRef
err := r.Db.GetContext(ctx, &inputRef, `
SELECT * FROM convenience_input_raw_references
SELECT * FROM convenience_input_raw_references
WHERE input_index = $1 and app_contract = $2
LIMIT 1`, inputIndex, appContract.Hex())
if err != nil {
Expand Down
24 changes: 21 additions & 3 deletions pkg/convenience/synchronizer/voucher_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"io"
"log/slog"
"net/http"
"net/url"
"os"
"strings"
)

Expand Down Expand Up @@ -55,15 +57,29 @@ GRAPH_QL_URL
const DefaultBatchSize = 10

type VoucherFetcher struct {
Url string
Url *url.URL
CursorAfter string
BatchSize int
Query string
}

func getEnvWithDefault(key, defaultValue string) string {
value, ok := os.LookupEnv(key)
if !ok {
return defaultValue
}
return value
}

func NewVoucherFetcher() *VoucherFetcher {
urlStr := getEnvWithDefault("GRAPH_QL_URL", "http://localhost:8080/graphql")
location, err := url.Parse(urlStr)
if err != nil {
slog.Error("Error parsing URL:", "error", err)
return nil
}
return &VoucherFetcher{
Url: "http://localhost:8080/graphql",
Url: location,
CursorAfter: "",
BatchSize: DefaultBatchSize,
Query: query,
Expand All @@ -90,8 +106,10 @@ func (v *VoucherFetcher) Fetch() (*VoucherResponse, error) {
return nil, err
}

url := v.Url.String()

// Make a POST request to the GraphQL endpoint
req, err := http.NewRequest("POST", v.Url, bytes.NewBuffer(payload))
req, err := http.NewRequest("POST", url, bytes.NewBuffer(payload))
if err != nil {
slog.Error("Error creating request:", "error", err)
return nil, err
Expand Down
132 changes: 132 additions & 0 deletions pkg/convenience/synchronizer_node/raw_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,138 @@ func NewRawRepository(connectionURL string, db *sqlx.DB) *RawRepository {
return &RawRepository{connectionURL, db}
}

func (s *RawRepository) GetApplicationRef(ctx context.Context, appID uint64) ([]model.ConvenienceApplication, error) {
apps := []model.ConvenienceApplication{}
query := `
SELECT
id,
name,
iapplication_address as application_address,
iconsensus_address as consensus_address,
template_hash,
template_uri,
epoch_length,
state,
reason,
last_processed_block,
last_claim_check_block,
last_output_check_block,
processed_inputs,
created_at,
updated_at
FROM
application
WHERE
id >= $1
ORDER BY id ASC
LIMIT $2
`
res, err := s.Db.QueryxContext(ctx, query, appID, LIMIT)

if err != nil {
slog.Error("Failed to execute query in CheckStatusApplicationRef", "error", err)
return nil, err
}

for res.Next() {
var app model.ConvenienceApplication
err := res.StructScan(&app)
if err != nil {
slog.Error("Failed to scan row into Application struct", "error", err)
return nil, err
}
apps = append(apps, app)
}

return apps, nil
}

func (s *RawRepository) GetLatestApp(ctx context.Context) (*model.ConvenienceApplication, error) {
var output *model.ConvenienceApplication
query := `
SELECT
id,
name,
iapplication_address as application_address,
iconsensus_address as consensus_address,
template_hash,
template_uri,
epoch_length,
state,
reason,
last_processed_block,
last_claim_check_block,
last_output_check_block,
processed_inputs,
created_at,
updated_at
FROM
application
ORDER BY
id ASC
LIMIT 1
`
err := s.Db.GetContext(ctx, &output, query)

if err != nil {
if errors.Is(err, sql.ErrNoRows) {
slog.Warn("No application found")
return nil, nil
}
slog.Error("Failed to get latest application ref", "error", err)
return nil, err
}

slog.Debug("Latest App fetched", "id", output.ID, "name", output.Name, "address", output.ApplicationAddress)

return output, nil
}

func (s *RawRepository) FindAllAppsRef(ctx context.Context) ([]model.ConvenienceApplication, error) {
query := `
SELECT
id,
name,
iapplication_address as application_address,
iconsensus_address as consensus_address,
template_hash,
template_uri,
epoch_length,
state,
reason,
last_processed_block,
last_claim_check_block,
last_output_check_block,
processed_inputs,
created_at,
updated_at
FROM
application
ORDER BY
id ASC
LIMIT $1
`

apps := []model.ConvenienceApplication{}
result, err := s.Db.QueryxContext(ctx, query, LIMIT)
if err != nil {
slog.Error("Failed to execute query in FindAllAppsRef", "error", err)
return nil, err
}

for result.Next() {
var app model.ConvenienceApplication
err := result.StructScan(&app)
if err != nil {
slog.Error("Failed to scan row into Application struct", "error", err)
return nil, err
}
apps = append(apps, app)
}

return apps, nil
}

func (s *RawRepository) First50RawInputsGteRefWithStatus(ctx context.Context, inputRef repository.RawInputRef, status string) ([]RawInput, error) {
query := `
SELECT
Expand Down
87 changes: 87 additions & 0 deletions pkg/convenience/synchronizer_node/synchronizer_app_create.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package synchronizernode

import (
"context"
"log/slog"

"github.com/cartesi/rollups-graphql/pkg/convenience/repository"
)

type SynchronizerAppCreator struct {
AppRepository *repository.ApplicationRepository
RawRepository *RawRepository
}

func NewSynchronizerAppCreator(
AppRepository *repository.ApplicationRepository,
RawRepository *RawRepository,
) *SynchronizerAppCreator {
return &SynchronizerAppCreator{
AppRepository,
RawRepository,
}
}

func (s *SynchronizerAppCreator) startTransaction(ctx context.Context) (context.Context, error) {
db := s.AppRepository.Db
ctxWithTx, err := repository.StartTransaction(ctx, &db)
if err != nil {
return ctx, err
}
return ctxWithTx, nil
}

func (s *SynchronizerAppCreator) rollbackTransaction(ctx context.Context) {
tx, hasTx := repository.GetTransaction(ctx)
if hasTx && tx != nil {
err := tx.Rollback()
if err != nil {
slog.Error("transaction rollback error", "err", err)
panic(err)
}
}
}

func (s *SynchronizerAppCreator) commitTransaction(ctx context.Context) error {
tx, hasTx := repository.GetTransaction(ctx)
if hasTx && tx != nil {
return tx.Commit()
}
return nil
}

func (s SynchronizerAppCreator) SyncApps(ctx context.Context) error {
txCtx, err := s.startTransaction(ctx)
if err != nil {
return err
}
err = s.syncApps(txCtx)
if err != nil {
s.rollbackTransaction(txCtx)
return err
}
err = s.commitTransaction(txCtx)
if err != nil {
return err
}
return nil
}

func (s *SynchronizerAppCreator) syncApps(ctx context.Context) error {
lastAppRef, err := s.RawRepository.GetLatestApp(ctx)
if err != nil {
return err
}
apps, err := s.RawRepository.GetApplicationRef(ctx, lastAppRef.ID)
if err != nil {
return err
}
for _, app := range apps {
_, err = s.AppRepository.Create(ctx, &app)
if err != nil {
return err
}
}

return nil
}
Loading

0 comments on commit 4f93a1a

Please sign in to comment.