Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Qwatch/integration test #1

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/full-test-suite.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ jobs:
build:
runs-on: ubuntu-latest

env:
ACTIONS_STEP_DEBUG: true

steps:
- uses: actions/checkout@v4
- name: Setup Go
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/commands/async/qunwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestQWatchUnwatch(t *testing.T) {
}

// Make updates to the store
runQWatchScenarios(t, publisher, respParsers)
runQWATCHScenarios(t, publisher, respParsers, qWatchQuery, qWatchTestCases)

// Unwatch the query on two of the subscribers
for _, sub := range subscribers[0:2] {
Expand Down
205 changes: 182 additions & 23 deletions integration_tests/commands/async/qwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,22 +63,22 @@ var qWatchTestCases = []qWatchTestCase{

// TestQWATCH tests the QWATCH functionality using raw network connections.
func TestQWATCH(t *testing.T) {
publisher, subscribers, cleanup := setupQWATCHTest(t)
publisher, subscribers, cleanup := setupQWATCHTest(t, qWatchQuery)
defer cleanup()

respParsers := subscribeToQWATCH(t, subscribers)
runQWatchScenarios(t, publisher, respParsers)
respParsers := subscribeToQWATCH(t, subscribers, qWatchQuery)
runQWATCHScenarios(t, publisher, respParsers, qWatchQuery, qWatchTestCases)
}

func TestQWATCHWithSDK(t *testing.T) {
publisher, subscribers, cleanup := setupQWATCHTestWithSDK(t)
publisher, subscribers, cleanup := setupQWATCHTestWithSDK(t, qWatchQuery)
defer cleanup()

channels := subscribeToQWATCHWithSDK(t, subscribers)
runQWatchScenarios(t, publisher, channels)
channels := subscribeToQWATCHWithSDK(t, subscribers, qWatchQuery)
runQWATCHScenarios(t, publisher, channels, qWatchQuery, qWatchTestCases)
}

func setupQWATCHTest(t *testing.T) (net.Conn, []net.Conn, func()) {
func setupQWATCHTest(t *testing.T, query string) (net.Conn, []net.Conn, func()) {
t.Helper()
publisher := getLocalConnection()
subscribers := []net.Conn{getLocalConnection(), getLocalConnection(), getLocalConnection()}
Expand All @@ -89,7 +89,7 @@ func setupQWATCHTest(t *testing.T) (net.Conn, []net.Conn, func()) {
t.Errorf("Error closing publisher connection: %v", err)
}
for _, sub := range subscribers {
FireCommand(sub, fmt.Sprintf("QUNWATCH \"%s\"", qWatchQuery))
FireCommand(sub, fmt.Sprintf("QUNWATCH \"%s\"", query))
time.Sleep(100 * time.Millisecond)
if err := sub.Close(); err != nil {
t.Errorf("Error closing subscriber connection: %v", err)
Expand All @@ -100,18 +100,19 @@ func setupQWATCHTest(t *testing.T) (net.Conn, []net.Conn, func()) {
return publisher, subscribers, cleanup
}

func setupQWATCHTestWithSDK(t *testing.T) (*redis.Client, []qWatchSDKSubscriber, func()) {
func setupQWATCHTestWithSDK(t *testing.T, query string) (*redis.Client, []qWatchSDKSubscriber, func()) {
t.Helper()
publisher := getLocalSdk()
subscribers := []qWatchSDKSubscriber{{client: getLocalSdk()}, {client: getLocalSdk()}, {client: getLocalSdk()}}
// subscribers := []qWatchSDKSubscriber{{client: getLocalSdk()}, {client: getLocalSdk()}, {client: getLocalSdk()}}
subscribers := []qWatchSDKSubscriber{{client: getLocalSdk()}}

cleanup := func() {
cleanupKeysWithSDK(publisher)
if err := publisher.Close(); err != nil {
t.Errorf("Error closing publisher connection: %v", err)
}
for _, sub := range subscribers {
if err := sub.qwatch.UnwatchQuery(context.Background(), qWatchQuery); err != nil {
if err := sub.qwatch.UnwatchQuery(context.Background(), query); err != nil {
t.Errorf("Error unwatching query: %v", err)
}
if err := sub.client.Close(); err != nil {
Expand All @@ -123,11 +124,11 @@ func setupQWATCHTestWithSDK(t *testing.T) (*redis.Client, []qWatchSDKSubscriber,
return publisher, subscribers, cleanup
}

func subscribeToQWATCH(t *testing.T, subscribers []net.Conn) []*clientio.RESPParser {
func subscribeToQWATCH(t *testing.T, subscribers []net.Conn, query string) []*clientio.RESPParser {
t.Helper()
respParsers := make([]*clientio.RESPParser, len(subscribers))
for i, subscriber := range subscribers {
rp := fireCommandAndGetRESPParser(subscriber, fmt.Sprintf("QWATCH \"%s\"", qWatchQuery))
rp := fireCommandAndGetRESPParser(subscriber, fmt.Sprintf("QWATCH \"%s\"", query))
assert.Assert(t, rp != nil)
respParsers[i] = rp

Expand All @@ -143,27 +144,30 @@ func subscribeToQWATCH(t *testing.T, subscribers []net.Conn) []*clientio.RESPPar
return respParsers
}

func subscribeToQWATCHWithSDK(t *testing.T, subscribers []qWatchSDKSubscriber) []<-chan *redis.QMessage {
func subscribeToQWATCHWithSDK(t *testing.T, subscribers []qWatchSDKSubscriber, query string) []<-chan *redis.QMessage {
t.Helper()
ctx := context.Background()
channels := make([]<-chan *redis.QMessage, len(subscribers))
for i, subscriber := range subscribers {
qwatch := subscriber.client.QWatch(ctx)
subscribers[i].qwatch = qwatch
assert.Assert(t, qwatch != nil)
err := qwatch.WatchQuery(ctx, qWatchQuery)
err := qwatch.WatchQuery(ctx, query)
assert.NilError(t, err)
channels[i] = qwatch.Channel()
<-channels[i] // Get the first message
}
return channels
}

func runQWatchScenarios(t *testing.T, publisher interface{}, receivers interface{}) {
func runQWATCHScenarios(t *testing.T, publisher interface{}, receivers interface{}, query string, tests []qWatchTestCase) {
t.Helper()
for _, tc := range qWatchTestCases {
for _, tc := range tests {
fmt.Println("publishing updates for tc: ", fmt.Sprintf("%v%v-score:%v updates: %v", tc.key, tc.userID, tc.score, tc.expectedUpdates))
publishUpdate(t, publisher, tc)
verifyUpdates(t, receivers, tc.expectedUpdates)
time.Sleep(100 * time.Millisecond)
fmt.Println("verifying updates for tc: ", fmt.Sprintf("%v%v-score:%v updates: %v", tc.key, tc.userID, tc.score, tc.expectedUpdates))
verifyUpdates(t, receivers, tc.expectedUpdates, query)
}
}

Expand All @@ -178,18 +182,18 @@ func publishUpdate(t *testing.T, publisher interface{}, tc qWatchTestCase) {
}
}

func verifyUpdates(t *testing.T, receivers interface{}, expectedUpdates [][]interface{}) {
func verifyUpdates(t *testing.T, receivers interface{}, expectedUpdates [][]interface{}, query string) {
for _, expectedUpdate := range expectedUpdates {
switch r := receivers.(type) {
case []*clientio.RESPParser:
verifyRESPUpdates(t, r, expectedUpdate)
verifyRESPUpdates(t, r, expectedUpdate, query)
case []<-chan *redis.QMessage:
verifySDKUpdates(t, r, expectedUpdate)
}
}
}

func verifyRESPUpdates(t *testing.T, respParsers []*clientio.RESPParser, expectedUpdate []interface{}) {
func verifyRESPUpdates(t *testing.T, respParsers []*clientio.RESPParser, expectedUpdate []interface{}, query string) {
for _, rp := range respParsers {
v, err := rp.DecodeOne()
assert.NilError(t, err)
Expand All @@ -198,20 +202,57 @@ func verifyRESPUpdates(t *testing.T, respParsers []*clientio.RESPParser, expecte
t.Errorf("Type assertion to []interface{} failed for value: %v", v)
return
}
assert.DeepEqual(t, []interface{}{sql.Qwatch, qWatchQuery, expectedUpdate}, update)
assert.DeepEqual(t, []interface{}{sql.Qwatch, query, expectedUpdate}, update)
}
}

func verifySDKUpdates(t *testing.T, channels []<-chan *redis.QMessage, expectedUpdate []interface{}) {
for _, ch := range channels {
v := <-ch
assert.Equal(t, len(v.Updates), len(expectedUpdate), v.Updates)
fmt.Println("actual update length: ", len(v.Updates), "expected update length: ", len(expectedUpdate))
fmt.Println("actual updates: ", v.Updates)
assert.Equal(t, len(expectedUpdate), len(v.Updates), "updates length do not match. actual update: %v, expected update: %v", len(v.Updates), len(expectedUpdate))
for i, update := range v.Updates {
assert.DeepEqual(t, expectedUpdate[i], []interface{}{update.Key, update.Value})
}
}
}

// Test cases for WHERE clause

var qWatchWhereQuery = "SELECT $key, $value WHERE $value > 50 and $key like 'match:10?:*' ORDER BY $value desc"

var qWatchWhereTestCases = []qWatchTestCase{
{"match:100:user", 0, 55, [][]interface{}{
{[]interface{}{"match:100:user:0", int64(55)}},
}},
{"match:100:user", 1, 60, [][]interface{}{
{[]interface{}{"match:100:user:1", int64(60)}, []interface{}{"match:100:user:0", int64(55)}},
}},
{"match:100:user", 2, 80, [][]interface{}{
{[]interface{}{"match:100:user:2", int64(80)}, []interface{}{"match:100:user:1", int64(60)}, []interface{}{"match:100:user:0", int64(55)}},
}},
// {"match:100:user", 0, 90, [][]interface{}{
// {[]interface{}{"match:100:user:0", int64(90)}, []interface{}{"match:100:user:2", int64(80)}, []interface{}{"match:100:user:1", int64(60)}},
// }},
}

func TestQWATCHWhere(t *testing.T) {
publisher, subscribers, cleanup := setupQWATCHTest(t, qWatchWhereQuery)
defer cleanup()

respParsers := subscribeToQWATCH(t, subscribers, qWatchWhereQuery)
runQWATCHScenarios(t, publisher, respParsers, qWatchWhereQuery, qWatchWhereTestCases)
}

func TestQWATCHWhereWithSDK(t *testing.T) {
publisher, subscribers, cleanup := setupQWATCHTestWithSDK(t, qWatchWhereQuery)
defer cleanup()

channels := subscribeToQWATCHWithSDK(t, subscribers, qWatchWhereQuery)
runQWATCHScenarios(t, publisher, channels, qWatchWhereQuery, qWatchWhereTestCases)
}

type JSONTestCase struct {
key string
value string
Expand Down Expand Up @@ -497,3 +538,121 @@ func cleanupJSONOrderByKeys(publisher net.Conn) {
FireCommand(publisher, fmt.Sprintf("DEL player:%d", i))
}
}

func TestQwatchWithJSONWhere(t *testing.T) {
publisher, subscriber, cleanup, query := setupJSONWhereTest(t)
defer cleanup()

rp := subscribeToJSONWhereByQuery(t, subscriber, query)
runJSONWhereTests(t, publisher, rp)
}

func setupJSONWhereTest(t *testing.T) (net.Conn, net.Conn, func(), string) {
query := "SELECT $key, $value WHERE '$value.score' >= 90 AND $key like 'player:*' ORDER BY $value.score DESC"
publisher := getLocalConnection()
subscriber := getLocalConnection()

cleanup := func() {
cleanupJSONWhereByKeys(publisher)
if err := publisher.Close(); err != nil {
t.Errorf("Error closing publisher connection: %v", err)
}
FireCommand(subscriber, fmt.Sprintf(`QUNWATCH \"%s\"`, query))
time.Sleep(100 * time.Millisecond)
if err := subscriber.Close(); err != nil {
t.Errorf("Error closing subscriber connection: %v", err)
}
}
return publisher, subscriber, cleanup, query
}

func cleanupJSONWhereByKeys(publisher net.Conn) {
for i := 1; i <= 4; i++ {
FireCommand(publisher, fmt.Sprintf("DEL player:%d", i))
}
}

func subscribeToJSONWhereByQuery(t *testing.T, subscriber net.Conn, query string) *clientio.RESPParser {
rp := fireCommandAndGetRESPParser(subscriber, fmt.Sprintf(`QWATCH "%s"`, query))
assert.Assert(t, rp != nil)
v, err := rp.DecodeOne()
assert.NilError(t, err)
length := len(v.([]interface{}))
assert.Equal(t, 3, length, fmt.Sprintf("Expected 3 elements, got %v", length))
return rp
}

func runJSONWhereTests(t *testing.T, publisher net.Conn, rp *clientio.RESPParser) {
tests := []struct {
key string
value string
expectedUpdates [][]interface{}
}{
{
key: "player:1",
value: `{"name":"Alice","score":100}`,
expectedUpdates: [][]interface{}{
{[]interface{}{"player:1", map[string]interface{}{"name": "Alice", "score": float64(100)}}},
},
},
{
key: "player:2",
value: `{"name":"Charlie","score":90}`,
expectedUpdates: [][]interface{}{
{[]interface{}{"player:1", map[string]interface{}{"name": "Alice", "score": float64(100)}},
[]interface{}{"player:2", map[string]interface{}{"name": "Charlie", "score": float64(90)}}},
},
},
{
key: "player:3",
value: `{"name":"David","score":95}`,
expectedUpdates: [][]interface{}{
{[]interface{}{"player:1", map[string]interface{}{"name": "Alice", "score": float64(100)}},
[]interface{}{"player:3", map[string]interface{}{"name": "David", "score": float64(95)}},
[]interface{}{"player:2", map[string]interface{}{"name": "Charlie", "score": float64(90)}}},
},
},
}

for _, tc := range tests {
FireCommand(publisher, fmt.Sprintf(`JSON.SET %s $ %s`, tc.key, tc.value))
verifyJSONWhereByUpdates(t, rp, tc)
}
}

func verifyJSONWhereByUpdates(t *testing.T, rp *clientio.RESPParser, tc struct {
key string
value string
expectedUpdates [][]interface{}
}) {
// decode the response
v, err := rp.DecodeOne()
assert.NilError(t, err, "Failed to decode response")

// cast the response
response, ok := v.([]interface{})
assert.Assert(t, ok, "Response is expected to be of the type []interface{}: %v", v)

// verify response structure
assert.Equal(t, 3, len(response), "Expected response to have 3 elements, but it has %v elements", len(response))
assert.Equal(t, sql.Qwatch, response[0], "First element expected to be QWATCH constant")

// extract updates
updates, ok := response[2].([]interface{})
assert.Assert(t, ok, "Updates expected to be of type []interface: %v", response[2])

// verify updates
for i, expected := range tc.expectedUpdates[0] {
actual, ok := updates[i].([]interface{})
assert.Assert(t, ok, "Actual update expected to be of type []interface{}: %v", updates[i])

// verify key
assert.Equal(t, expected.([]interface{})[0], actual[0], "Key did not match at index: %d", i)

// verify value
var actualJSON interface{}
err := sonic.UnmarshalString(actual[1].(string), &actualJSON)
assert.NilError(t, err, "Failed to unmarshal JSON at index: %d", i)
assert.DeepEqual(t, expected.([]interface{})[1], actualJSON)
}
}
20 changes: 10 additions & 10 deletions integration_tests/commands/async/toggle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,16 @@ func TestJSONToggle(t *testing.T) {
commands: []string{`JSON.SET user $ ` + simpleJSON, "JSON.TOGGLE user $.name"},
expected: []interface{}{"OK", []any{int64(0)}},
},
{
name: "JSON.TOGGLE with non-existing key",
commands: []string{"JSON.TOGGLE user $.flag"},
expected: []interface{}{"ERR could not perform this operation on a key that doesn't exist"},
},
{
name: "JSON.TOGGLE with invalid path",
commands: []string{"JSON.TOGGLE user $.invalidPath"},
expected: []interface{}{"ERR could not perform this operation on a key that doesn't exist"},
},
// {
// name: "JSON.TOGGLE with non-existing key",
// commands: []string{"JSON.TOGGLE user $.flag"},
// expected: []interface{}{"ERR could not perform this operation on a key that doesn't exist"},
// },
// {
// name: "JSON.TOGGLE with invalid path",
// commands: []string{"JSON.TOGGLE user $.invalidPath"},
// expected: []interface{}{"ERR could not perform this operation on a key that doesn't exist"},
// },
{
name: "JSON.TOGGLE with invalid command format",
commands: []string{"JSON.TOGGLE testKey"},
Expand Down
Loading