Skip to content

Commit

Permalink
feat: use binary frame instead of plain text to send packet
Browse files Browse the repository at this point in the history
  • Loading branch information
RTAkland committed Jan 4, 2025
1 parent 68c8eea commit d03e4a7
Show file tree
Hide file tree
Showing 15 changed files with 356 additions and 86 deletions.
27 changes: 27 additions & 0 deletions api/src/main/kotlin/cn/rtast/kwsify/Base64.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright © 2025 RTAkland
* Author: RTAkland
* Date: 2025/1/3
*/

@file:Suppress("unused")

package cn.rtast.kwsify

import java.util.Base64

fun String.encodeToBase64(): String {
return Base64.getEncoder().encodeToString(this.toByteArray(Charsets.UTF_8))
}

fun ByteArray.encodeToBase64(): String {
return Base64.getEncoder().encodeToString(this)
}

fun String.decodeToString(): String {
return String(Base64.getDecoder().decode(this), Charsets.UTF_8)
}

fun String.decodeToByteArray(): ByteArray {
return Base64.getDecoder().decode(this)
}
5 changes: 5 additions & 0 deletions api/src/main/kotlin/cn/rtast/kwsify/IOperation.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ interface IOperation {
*/
fun publish(channel: String, payload: String): Boolean

/**
* 发布二进制消息
*/
fun publish(channel: String, payload: ByteArray): Boolean

/**
* 订阅频道消息
*/
Expand Down
33 changes: 17 additions & 16 deletions api/src/main/kotlin/cn/rtast/kwsify/Kwsify.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,34 @@

package cn.rtast.kwsify

import cn.rtast.kwsify.entity.OutboundMessagePacket
import cn.rtast.kwsify.entity.PublishPacket
import cn.rtast.kwsify.entity.SubscribePacket
import cn.rtast.kwsify.entity.UnsubscribePacket
import cn.rtast.kwsify.entity.*
import cn.rtast.kwsify.enums.OPCode
import cn.rtast.kwsify.util.fromJson
import cn.rtast.kwsify.util.send
import cn.rtast.kwsify.util.toJson
import org.java_websocket.client.WebSocketClient
import org.java_websocket.handshake.ServerHandshake
import java.net.URI
import java.nio.ByteBuffer
import java.util.*

class Kwsify(private val address: String) : IOperation {
private lateinit var websocket: WebSocketClient
private val subscribers = mutableMapOf<String, MutableList<Subscriber>>()
private val reconnectInterval = 5L

override fun connect(channel: String, broadcastSelf: Boolean) {
websocket = object : WebSocketClient(URI(address)) {
override fun onOpen(handshakedata: ServerHandshake) {
val authPacket = SubscribePacket(OPCode.JOIN, UUID.randomUUID(), channel, broadcastSelf)
val authPacket = SubscribePacket(OPCode.JOIN, UUID.randomUUID(), channel, broadcastSelf).toByteArray()
websocket.send(authPacket)
}

override fun onMessage(message: String) {
val inboundPacket = message.fromJson<OutboundMessagePacket>()
val channel = inboundPacket.channel
}

override fun onMessage(bytes: ByteBuffer) {
val packet = OutboundMessageBytesPacket.fromByteArray(bytes.array())
val channel = packet.channel
subscribers[channel]?.forEach { subscriber ->
subscriber.onMessage(channel, inboundPacket.body, inboundPacket)
subscriber.onMessage(channel, bytes.array(), packet)
}
}

Expand All @@ -56,10 +55,12 @@ class Kwsify(private val address: String) : IOperation {
}

override fun publish(channel: String, payload: String): Boolean {
val packet = PublishPacket(OPCode.PUBLISH, payload, channel)
subscribers[channel]?.forEach { subscriber ->
websocket.send(packet)
}
return this.publish(channel, payload.toByteArray())
}

override fun publish(channel: String, payload: ByteArray): Boolean {
val packet = PublishPacket(OPCode.PUBLISH, payload, channel).toByteArray()
websocket.send(packet)
return true
}

Expand All @@ -76,7 +77,7 @@ class Kwsify(private val address: String) : IOperation {
try {
if (subscribers.containsKey(channel)) {
subscribers.remove(channel)
val packet = UnsubscribePacket(OPCode.EXIT_CHANNEL, channel)
val packet = UnsubscribePacket(OPCode.EXIT_CHANNEL, channel).toByteArray()
websocket.send(packet)
return true
}
Expand Down
6 changes: 3 additions & 3 deletions api/src/main/kotlin/cn/rtast/kwsify/Subscriber.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@

package cn.rtast.kwsify

import cn.rtast.kwsify.entity.OutboundMessagePacket
import cn.rtast.kwsify.entity.OutboundMessageBytesPacket

interface Subscriber {
/**
* 接收到消息时
* 接收到二进制消息时
*/
fun onMessage(channel: String, payload: String, packet: OutboundMessagePacket)
fun onMessage(channel: String, payload: ByteArray, packet: OutboundMessageBytesPacket) {}

/**
* websocket连接断开时
Expand Down
14 changes: 8 additions & 6 deletions api/src/test/kotlin/test/kwsify/TestClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,14 @@ package test.kwsify

import cn.rtast.kwsify.Kwsify
import cn.rtast.kwsify.Subscriber
import cn.rtast.kwsify.entity.OutboundMessagePacket
import cn.rtast.kwsify.entity.OutboundMessageBytesPacket

fun main() {
val wsify = Kwsify("ws://127.0.0.1:8080")
wsify.subscribe("test", true, object : Subscriber {
override fun onMessage(channel: String, payload: String, packet: OutboundMessagePacket) {
println(packet.body)
Thread.sleep(500L)
wsify.publish("test", "114514")

override fun onMessage(channel: String, payload: ByteArray, packet: OutboundMessageBytesPacket) {
println(String(packet.body))
}

override fun onClosed(channel: String) {
Expand All @@ -26,5 +25,8 @@ fun main() {
}
})
Thread.sleep(1000L)
wsify.publish("test", "114514")
while (true) {
wsify.publish("test", "114514")
Thread.sleep(1000)
}
}
7 changes: 6 additions & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
kotlin.code.style=official
appVersion=1.1.0
libVersion=1.1.1
libVersion=1.1.1

#systemProp.http.proxyHost=127.0.0.1
#systemProp.http.proxyPort=12334
#systemProp.https.proxyHost=127.0.0.1
#systemProp.https.proxyPort=12334
36 changes: 36 additions & 0 deletions src/main/kotlin/cn/rtast/kwsify/entity/OPCodePacket.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright © 2025 RTAkland
* Author: RTAkland
* Date: 2025/1/3
*/


package cn.rtast.kwsify.entity

import java.nio.ByteBuffer

data class OPCodePacket(
val op: Int,
val channel: String,
) {
fun toByteArray(): ByteArray {
val buffer = ByteBuffer.allocate(4 + 4 + channel.toByteArray().size)
return buffer.apply {
putInt(op)
putInt(channel.toByteArray().size)
put(channel.toByteArray())
}.array()
}

companion object {
fun fromByteArray(buffer: ByteBuffer): OPCodePacket {
val op = buffer.int
val channelSize = buffer.int
val channelBytes = ByteArray(channelSize).apply {
buffer.get(this)
}
val channel = String(channelBytes)
return OPCodePacket(op, channel)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright © 2025 RTAkland
* Author: RTAkland
* Date: 2025/1/3
*/


package cn.rtast.kwsify.entity

import java.nio.ByteBuffer
import java.util.*

data class OutboundMessageBytesPacket(
val op: Int,
val body: ByteArray,
val channel: String,
val sender: Sender
) {
data class Sender(
val host: String,
val port: Int,
val address: String,
val uuid: UUID
)


fun toByteArray(): ByteArray {
val hostSize = sender.host.toByteArray().size
val addressSize = sender.address.toByteArray().size
val channelSize = channel.toByteArray().size
val totalLength = body.size + channelSize + hostSize + addressSize + 8 + 8 + 4 + 4 + 4 + 4 + 8
val buffer = ByteBuffer.allocate(totalLength)
buffer.putInt(op) // op: 4 bytes
buffer.putInt(body.size) // body size: 4 bytes
buffer.put(body) // body data
buffer.putInt(channelSize) // channel size: 4 bytes
buffer.put(channel.toByteArray()) // channel data
buffer.putInt(hostSize) // host size: 4 bytes
buffer.put(sender.host.toByteArray()) // host data
buffer.putInt(sender.port) // port: 4 bytes
buffer.putInt(addressSize) // address size: 4 bytes
buffer.put(sender.address.toByteArray()) // address data
buffer.putLong(sender.uuid.mostSignificantBits) // UUID most significant bits: 8 bytes
buffer.putLong(sender.uuid.leastSignificantBits) // UUID least significant bits: 8 bytes
return buffer.array()
}

companion object {
fun fromByteArray(bytes: ByteArray): OutboundMessageBytesPacket {
val buffer = ByteBuffer.wrap(bytes)
val op = buffer.getInt()
val bodySize = buffer.getInt()
val body = ByteArray(bodySize)
buffer.get(body)
val channelSize = buffer.getInt()
val channelBytes = ByteArray(channelSize)
buffer.get(channelBytes)
val channel = String(channelBytes)
val hostSize = buffer.getInt()
val hostBytes = ByteArray(hostSize)
buffer.get(hostBytes)
val host = String(hostBytes)
val port = buffer.getInt()
val addressSize = buffer.getInt()
val addressBytes = ByteArray(addressSize)
buffer.get(addressBytes)
val address = String(addressBytes)
val mostSignificantBits = buffer.getLong()
val leastSignificantBits = buffer.getLong()
val uuid = UUID(mostSignificantBits, leastSignificantBits)
val sender = Sender(host, port, address, uuid)
return OutboundMessageBytesPacket(op, body, channel, sender)
}
}


override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false
other as OutboundMessageBytesPacket
return op == other.op &&
body.contentEquals(other.body) &&
channel == other.channel &&
sender == other.sender
}

override fun hashCode(): Int {
var result = op
result = 31 * result + body.contentHashCode()
result = 31 * result + channel.hashCode()
result = 31 * result + (sender?.hashCode() ?: 0)
return result
}
}
24 changes: 0 additions & 24 deletions src/main/kotlin/cn/rtast/kwsify/entity/OutboundMessagePacket.kt

This file was deleted.

54 changes: 52 additions & 2 deletions src/main/kotlin/cn/rtast/kwsify/entity/PublishPacket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,58 @@

package cn.rtast.kwsify.entity

import java.nio.ByteBuffer

data class PublishPacket(
val op: Int,
val body: String,
val body: ByteArray,
val channel: String
)
) {
fun toByteArray(): ByteArray {
val totalLength = 4 + 4 + body.size + 4 + channel.toByteArray().size
val buffer = ByteBuffer.allocate(totalLength).apply {
putInt(op)
putInt(channel.toByteArray().size)
put(channel.toByteArray())
putInt(body.size)
put(body)
}
return buffer.array()
}

companion object {
fun fromByteArray(buffer: ByteBuffer): PublishPacket {
val op = buffer.int
val channelSize = buffer.int
val channelBytes = ByteArray(channelSize).apply {
buffer.get(this)
}
val channel = String(channelBytes)
val bodySize = buffer.int
val body = ByteArray(bodySize).apply {
buffer.get(this)
}
return PublishPacket(op, body, channel)
}
}

override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false

other as PublishPacket

if (op != other.op) return false
if (!body.contentEquals(other.body)) return false
if (channel != other.channel) return false

return true
}

override fun hashCode(): Int {
var result = op
result = 31 * result + body.contentHashCode()
result = 31 * result + channel.hashCode()
return result
}
}
Loading

0 comments on commit d03e4a7

Please sign in to comment.