Skip to content

Commit

Permalink
use asynchronous info command calls (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn authored Mar 28, 2024
1 parent aafe535 commit 8fe366e
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,10 @@ class AerospikeHandler(protected val client: IAerospikeClient)(implicit ec: Exec
Future(client.execute(policy, statement, operations: _*))
}

override def info(node: Node, name: String): Future[String] = {
Future(Info.request(node, name))
override def info(node: Node, commands: String*)(implicit policy: InfoPolicy): Future[Map[String, String]] = {
val listener = new ScalaInfoListener
client.info(null, listener, policy, node, commands: _*)
listener.future
}

override def scanNodeName(nodeName: String, ns: String, set: String, binNames: String*)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,5 +108,6 @@ trait AsyncHandler[F[_]] {
// Info
//--------------------------------------------------------

def info(node: Node, name: String): F[String]
def info(node: Node, commands: String*)
(implicit policy: InfoPolicy = null): F[Map[String, String]]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.github.reugn.aerospike.scala.listener

import com.aerospike.client.AerospikeException
import com.aerospike.client.listener.InfoListener

import java.util
import scala.collection.JavaConverters._

class ScalaInfoListener extends InfoListener with PromiseLike[Map[String, String]] {

override def onSuccess(map: util.Map[String, String]): Unit = {
success(map.asScala.toMap)
}

override def onFailure(ae: AerospikeException): Unit = {
failure(ae)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ import com.aerospike.client.query.{Filter, KeyRecord}
import io.github.reugn.aerospike.scala.model.QueryStatement
import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.{BeforeAndAfter, FutureOutcome}
import org.scalatest.{BeforeAndAfter, FutureOutcome, OptionValues}

import scala.concurrent.{ExecutionContext, Future}

class AerospikeHandlerTest extends AsyncFlatSpec with TestCommon with Matchers with BeforeAndAfter {
class AerospikeHandlerTest extends AsyncFlatSpec
with TestCommon with Matchers with BeforeAndAfter with OptionValues {

private implicit val actorSystem: ActorSystem = ActorSystem("test")
private implicit val materializer: Materializer = Materializer(actorSystem)
Expand Down Expand Up @@ -170,6 +171,14 @@ class AerospikeHandlerTest extends AsyncFlatSpec with TestCommon with Matchers w
}
}

it should "execute info command properly" in {
val node = client.asJava.getCluster.getRandomNode
val command = "namespaces"
client.info(node, command) map { result =>
result.get(command).value shouldBe namespace
}
}

it should "scan nodes properly" in {
Future.sequence(client.asJava.getCluster.validateNodes().toList map { node =>
client.scanNode(node, namespace, set) map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,11 @@ class MonixAerospikeHandler(protected val client: IAerospikeClient)
Task(client.execute(policy, statement, operations: _*))
}

override def info(node: Node, name: String): Task[String] = {
Task(Info.request(node, name))
override def info(node: Node, commands: String*)(implicit policy: InfoPolicy): Task[Map[String, String]] = {
Task(Info.request(policy, node, commands: _*)) map {
import scala.collection.JavaConverters._
_.asScala.toMap
}
}

override def query(statement: QueryStatement)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import io.github.reugn.aerospike.scala.TestCommon
import io.github.reugn.aerospike.scala.model.QueryStatement
import monix.execution.Scheduler.Implicits.global
import monix.reactive.Consumer
import org.scalatest.BeforeAndAfter
import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.{BeforeAndAfter, OptionValues}

import scala.concurrent.Await
import scala.concurrent.duration.Duration

class MonixAerospikeHandlerTest extends AsyncFlatSpec with TestCommon with Matchers with BeforeAndAfter {
class MonixAerospikeHandlerTest extends AsyncFlatSpec
with TestCommon with Matchers with BeforeAndAfter with OptionValues {

private val client: MonixAerospikeHandler = MonixAerospikeHandler(hostname, port)
override protected val set = "client_monix"
Expand Down Expand Up @@ -115,6 +116,14 @@ class MonixAerospikeHandlerTest extends AsyncFlatSpec with TestCommon with Match
result.status shouldBe true
}

it should "execute info command properly" in {
val node = client.asJava.getCluster.getRandomNode
val command = "namespaces"
val t = client.info(node, command)
val result = Await.result(t.runToFuture, Duration.Inf)
result.get(command).value shouldBe namespace
}

it should "operate list of BatchRecords properly" in {
val records: Seq[BatchRecord] =
List(new BatchWrite(keys(0), Array(Operation.put(new Bin("intBin", 100))))) ++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,11 @@ class ZioAerospikeHandler(protected val client: IAerospikeClient)
ZIO.attemptBlocking(client.execute(policy, statement, operations: _*))
}

override def info(node: Node, name: String): Task[String] = {
ZIO.attemptBlocking(Info.request(node, name))
override def info(node: Node, commands: String*)(implicit policy: InfoPolicy): Task[Map[String, String]] = {
ZIO.attemptBlocking(Info.request(policy, node, commands: _*)) map {
import scala.collection.JavaConverters._
_.asScala.toMap
}
}

override def query(statement: QueryStatement)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import com.aerospike.client._
import com.aerospike.client.exp.{Exp, ExpOperation, ExpReadFlags}
import io.github.reugn.aerospike.scala.TestCommon
import io.github.reugn.aerospike.scala.model.QueryStatement
import org.scalatest.BeforeAndAfter
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.{BeforeAndAfter, OptionValues}
import zio.Runtime.{default => rt}
import zio.{Unsafe, ZIO}

class ZioAerospikeHandlerTest extends AnyFlatSpec with TestCommon with Matchers with BeforeAndAfter {
class ZioAerospikeHandlerTest extends AnyFlatSpec
with TestCommon with Matchers with BeforeAndAfter with OptionValues {

private val client: ZioAerospikeHandler = ZioAerospikeHandler(hostname, port)
override protected val set = "client_zio"
Expand Down Expand Up @@ -117,6 +118,14 @@ class ZioAerospikeHandlerTest extends AnyFlatSpec with TestCommon with Matchers
result shouldBe true
}

it should "execute info command properly" in {
val node = client.asJava.getCluster.getRandomNode
val command = "namespaces"
val t = client.info(node, command)
val result = unsafeRun(t)
result.get(command).value shouldBe namespace
}

it should "query all properly" in {
val queryStatement = QueryStatement(namespace, setName = Some(set))
val t = client.query(queryStatement).runCollect
Expand Down

0 comments on commit 8fe366e

Please sign in to comment.