Skip to content

nats-io/jsm.go

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Overview

This is a Go based library to manage and interact with JetStream.

This package is the underlying library for the nats CLI, our Terraform provider, GitHub Actions and Kubernetes CRDs. It's essentially a direct wrapping of the JetStream API with few userfriendly features and requires deep technical knowledge of the JetStream internals.

For typical end users we suggest the nats.go package.

Initialization

This package is modeled as a Manager instance that receives a NATS Connection and sets default timeouts and validation for all interaction with JetStream.

Multiple Managers can be used in your application each with own timeouts and connection.

mgr, _ := jsm.New(nc, jsm.WithTimeout(10*time.Second))

This creates a Manager with a 10 second timeout when accessing the JetStream API. All examples below assume a manager was created as above.

Schema Registry

All the JetStream API messages and some events and advisories produced by the NATS Server have JSON Schemas associated with them, the api package has a Schema Registry and helpers to discover and interact with these.

The Schema Registry can be accessed on the cli in the nats schemas command where you can list, search and view schemas and validate data based on schemas.

Example Message

To retrieve the Stream State for a specific Stream one accesses the $JS.API.STREAM.INFO.<stream> API, this will respond with data like below:

{
  "type": "io.nats.jetstream.api.v1.stream_info_response",
  "config": {
    "name": "TESTING",
    "subjects": [
      "js.in.testing"
    ],
    "retention": "limits",
    "max_consumers": -1,
    "max_msgs": -1,
    "max_bytes": -1,
    "discard": "old",
    "max_age": 0,
    "max_msg_size": -1,
    "storage": "file",
    "num_replicas": 1,
    "duplicate_window": 120000000000
  },
  "created": "2020-10-09T12:40:07.648216464Z",
  "state": {
    "messages": 1,
    "bytes": 81,
    "first_seq": 1017,
    "first_ts": "2020-10-09T19:43:40.867729419Z",
    "last_seq": 1017,
    "last_ts": "2020-10-09T19:43:40.867729419Z",
    "consumer_count": 1
  }
}

Here the type of the message is io.nats.jetstream.api.v1.stream_info_response, the API package can help parse this into the correct format.

Message Schemas

Given a message kind one can retrieve the full JSON Schema as bytes:

schema, _ := api.Schema("io.nats.jetstream.api.v1.stream_info_response")

Once can also retrieve it based on a specific message content:

schemaType, _ := api.SchemaTypeForMessage(m.Data)
schema, _ := api.Schema(schemaType)

Several other Schema related helpers exist to search Schemas, fine URLs and more. See the api Reference.

Parsing Message Content

JetStream will produce metrics about message Acknowledgments, API audits and more, here we subscribe to the metric subject and print a specific received message type.

nc.Subscribe("$JS.EVENT.ADVISORY.>", func(m *nats.Msg){
    kind, msg, _ := api.ParseMessage(m.Data)
    log.Printf("Received message of type %s", kind) // io.nats.jetstream.advisory.v1.api_audit

    switch e := event.(type){
    case advisory.JetStreamAPIAuditV1:
        fmt.Printf("Audit event on subject %s from %s\n", e.Subject, e.Client.Name)
    }
})

Above we gain full access to all contents of the message in it's native format, but we need to know in advance what we will get, we can render the messages as text in a generic way though:

nc.Subscribe("$JS.EVENT.ADVISORY.>", func(m *nats.Msg){
    kind, msg, _ := api.ParseMessage(m.Data)

    if kind == "io.nats.unknown_message" {
        return // a message without metadata or of a unknown format was received
    }

    ne, ok := event.(api.Event)
    if !ok {
        return fmt.Errorf("event %q does not implement the Event interface", kind)
    }

    err = api.RenderEvent(os.Stdout, ne, api.TextCompactFormat)
    if err != nil {
        return fmt.Errorf("display failed: %s", err)
    }
})

This will produce output like:

11:25:49 [JS API] $JS.API.STREAM.INFO.TESTING $G
11:25:52 [JS API] $JS.API.STREAM.NAMES $G
11:25:52 [JS API] $JS.API.STREAM.NAMES $G
11:25:53 [JS API] $JS.API.STREAM.INFO.TESTING $G

The api.TextCompactFormat is one of a few we support, also api.TextExtendedFormat for a full multi line format, api.ApplicationCloudEventV1Format for CloudEvents v1 format and api.ApplicationJSONFormat for JSON.

API Validation

The data structures sent to JetStream can be validated before submission to NATS which can speed up user feedback and provide better errors.

type SchemaValidator struct{}

func (v SchemaValidator) ValidateStruct(data any, schemaType string) (ok bool, errs []string) {
	s, err := api.Schema(schemaType)
	if err != nil {
		return false, []string{"unknown schema type %s", schemaType}
	}

	ls := gojsonschema.NewBytesLoader(s)
	ld := gojsonschema.NewGoLoader(data)
	result, err := gojsonschema.Validate(ls, ld)
	if err != nil {
		return false, []string{fmt.Sprintf("validation failed: %s", err)}
	}

	if result.Valid() {
		return true, nil
	}

	errors := make([]string, len(result.Errors()))
	for i, verr := range result.Errors() {
		errors[i] = verr.String()
	}

	return false, errors
}

This is a api.StructValidator implementation that uses JSON Schema to do deep validation of the structures sent to JetStream.

This can be used by the Manager to validate all API access.

mgr, _ := jsm.New(nc, jsm.WithAPIValidation(new(SchemaValidator)))