diff --git a/api/src/main/kotlin/cn/rtast/kwsify/Base64.kt b/api/src/main/kotlin/cn/rtast/kwsify/Base64.kt new file mode 100644 index 0000000..aed7168 --- /dev/null +++ b/api/src/main/kotlin/cn/rtast/kwsify/Base64.kt @@ -0,0 +1,27 @@ +/* + * Copyright © 2025 RTAkland + * Author: RTAkland + * Date: 2025/1/3 + */ + +@file:Suppress("unused") + +package cn.rtast.kwsify + +import java.util.Base64 + +fun String.encodeToBase64(): String { + return Base64.getEncoder().encodeToString(this.toByteArray(Charsets.UTF_8)) +} + +fun ByteArray.encodeToBase64(): String { + return Base64.getEncoder().encodeToString(this) +} + +fun String.decodeToString(): String { + return String(Base64.getDecoder().decode(this), Charsets.UTF_8) +} + +fun String.decodeToByteArray(): ByteArray { + return Base64.getDecoder().decode(this) +} \ No newline at end of file diff --git a/api/src/main/kotlin/cn/rtast/kwsify/IOperation.kt b/api/src/main/kotlin/cn/rtast/kwsify/IOperation.kt index 14cae72..d315abc 100644 --- a/api/src/main/kotlin/cn/rtast/kwsify/IOperation.kt +++ b/api/src/main/kotlin/cn/rtast/kwsify/IOperation.kt @@ -24,6 +24,11 @@ interface IOperation { */ fun publish(channel: String, payload: String): Boolean + /** + * 发布二进制消息 + */ + fun publish(channel: String, payload: ByteArray): Boolean + /** * 订阅频道消息 */ diff --git a/api/src/main/kotlin/cn/rtast/kwsify/Kwsify.kt b/api/src/main/kotlin/cn/rtast/kwsify/Kwsify.kt index 84d5d2e..11d2c20 100644 --- a/api/src/main/kotlin/cn/rtast/kwsify/Kwsify.kt +++ b/api/src/main/kotlin/cn/rtast/kwsify/Kwsify.kt @@ -7,35 +7,34 @@ package cn.rtast.kwsify -import cn.rtast.kwsify.entity.OutboundMessagePacket -import cn.rtast.kwsify.entity.PublishPacket -import cn.rtast.kwsify.entity.SubscribePacket -import cn.rtast.kwsify.entity.UnsubscribePacket +import cn.rtast.kwsify.entity.* import cn.rtast.kwsify.enums.OPCode -import cn.rtast.kwsify.util.fromJson -import cn.rtast.kwsify.util.send +import cn.rtast.kwsify.util.toJson import org.java_websocket.client.WebSocketClient import org.java_websocket.handshake.ServerHandshake import java.net.URI +import java.nio.ByteBuffer import java.util.* class Kwsify(private val address: String) : IOperation { private lateinit var websocket: WebSocketClient private val subscribers = mutableMapOf>() - private val reconnectInterval = 5L override fun connect(channel: String, broadcastSelf: Boolean) { websocket = object : WebSocketClient(URI(address)) { override fun onOpen(handshakedata: ServerHandshake) { - val authPacket = SubscribePacket(OPCode.JOIN, UUID.randomUUID(), channel, broadcastSelf) + val authPacket = SubscribePacket(OPCode.JOIN, UUID.randomUUID(), channel, broadcastSelf).toByteArray() websocket.send(authPacket) } override fun onMessage(message: String) { - val inboundPacket = message.fromJson() - val channel = inboundPacket.channel + } + + override fun onMessage(bytes: ByteBuffer) { + val packet = OutboundMessageBytesPacket.fromByteArray(bytes.array()) + val channel = packet.channel subscribers[channel]?.forEach { subscriber -> - subscriber.onMessage(channel, inboundPacket.body, inboundPacket) + subscriber.onMessage(channel, bytes.array(), packet) } } @@ -56,10 +55,12 @@ class Kwsify(private val address: String) : IOperation { } override fun publish(channel: String, payload: String): Boolean { - val packet = PublishPacket(OPCode.PUBLISH, payload, channel) - subscribers[channel]?.forEach { subscriber -> - websocket.send(packet) - } + return this.publish(channel, payload.toByteArray()) + } + + override fun publish(channel: String, payload: ByteArray): Boolean { + val packet = PublishPacket(OPCode.PUBLISH, payload, channel).toByteArray() + websocket.send(packet) return true } @@ -76,7 +77,7 @@ class Kwsify(private val address: String) : IOperation { try { if (subscribers.containsKey(channel)) { subscribers.remove(channel) - val packet = UnsubscribePacket(OPCode.EXIT_CHANNEL, channel) + val packet = UnsubscribePacket(OPCode.EXIT_CHANNEL, channel).toByteArray() websocket.send(packet) return true } diff --git a/api/src/main/kotlin/cn/rtast/kwsify/Subscriber.kt b/api/src/main/kotlin/cn/rtast/kwsify/Subscriber.kt index 3ddb01f..3269cd9 100644 --- a/api/src/main/kotlin/cn/rtast/kwsify/Subscriber.kt +++ b/api/src/main/kotlin/cn/rtast/kwsify/Subscriber.kt @@ -7,13 +7,13 @@ package cn.rtast.kwsify -import cn.rtast.kwsify.entity.OutboundMessagePacket +import cn.rtast.kwsify.entity.OutboundMessageBytesPacket interface Subscriber { /** - * 接收到消息时 + * 接收到二进制消息时 */ - fun onMessage(channel: String, payload: String, packet: OutboundMessagePacket) + fun onMessage(channel: String, payload: ByteArray, packet: OutboundMessageBytesPacket) {} /** * websocket连接断开时 diff --git a/api/src/test/kotlin/test/kwsify/TestClient.kt b/api/src/test/kotlin/test/kwsify/TestClient.kt index 3b50381..ffcc34f 100644 --- a/api/src/test/kotlin/test/kwsify/TestClient.kt +++ b/api/src/test/kotlin/test/kwsify/TestClient.kt @@ -9,15 +9,14 @@ package test.kwsify import cn.rtast.kwsify.Kwsify import cn.rtast.kwsify.Subscriber -import cn.rtast.kwsify.entity.OutboundMessagePacket +import cn.rtast.kwsify.entity.OutboundMessageBytesPacket fun main() { val wsify = Kwsify("ws://127.0.0.1:8080") wsify.subscribe("test", true, object : Subscriber { - override fun onMessage(channel: String, payload: String, packet: OutboundMessagePacket) { - println(packet.body) - Thread.sleep(500L) - wsify.publish("test", "114514") + + override fun onMessage(channel: String, payload: ByteArray, packet: OutboundMessageBytesPacket) { + println(String(packet.body)) } override fun onClosed(channel: String) { @@ -26,5 +25,8 @@ fun main() { } }) Thread.sleep(1000L) - wsify.publish("test", "114514") + while (true) { + wsify.publish("test", "114514") + Thread.sleep(1000) + } } \ No newline at end of file diff --git a/gradle.properties b/gradle.properties index 94e3c2e..dffa14a 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,8 @@ kotlin.code.style=official appVersion=1.1.0 -libVersion=1.1.1 \ No newline at end of file +libVersion=1.1.1 + +#systemProp.http.proxyHost=127.0.0.1 +#systemProp.http.proxyPort=12334 +#systemProp.https.proxyHost=127.0.0.1 +#systemProp.https.proxyPort=12334 \ No newline at end of file diff --git a/src/main/kotlin/cn/rtast/kwsify/entity/OPCodePacket.kt b/src/main/kotlin/cn/rtast/kwsify/entity/OPCodePacket.kt new file mode 100644 index 0000000..f50d3ec --- /dev/null +++ b/src/main/kotlin/cn/rtast/kwsify/entity/OPCodePacket.kt @@ -0,0 +1,36 @@ +/* + * Copyright © 2025 RTAkland + * Author: RTAkland + * Date: 2025/1/3 + */ + + +package cn.rtast.kwsify.entity + +import java.nio.ByteBuffer + +data class OPCodePacket( + val op: Int, + val channel: String, +) { + fun toByteArray(): ByteArray { + val buffer = ByteBuffer.allocate(4 + 4 + channel.toByteArray().size) + return buffer.apply { + putInt(op) + putInt(channel.toByteArray().size) + put(channel.toByteArray()) + }.array() + } + + companion object { + fun fromByteArray(buffer: ByteBuffer): OPCodePacket { + val op = buffer.int + val channelSize = buffer.int + val channelBytes = ByteArray(channelSize).apply { + buffer.get(this) + } + val channel = String(channelBytes) + return OPCodePacket(op, channel) + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/cn/rtast/kwsify/entity/OutboundMessageBytesPacket.kt b/src/main/kotlin/cn/rtast/kwsify/entity/OutboundMessageBytesPacket.kt new file mode 100644 index 0000000..f612c20 --- /dev/null +++ b/src/main/kotlin/cn/rtast/kwsify/entity/OutboundMessageBytesPacket.kt @@ -0,0 +1,94 @@ +/* + * Copyright © 2025 RTAkland + * Author: RTAkland + * Date: 2025/1/3 + */ + + +package cn.rtast.kwsify.entity + +import java.nio.ByteBuffer +import java.util.* + +data class OutboundMessageBytesPacket( + val op: Int, + val body: ByteArray, + val channel: String, + val sender: Sender +) { + data class Sender( + val host: String, + val port: Int, + val address: String, + val uuid: UUID + ) + + + fun toByteArray(): ByteArray { + val hostSize = sender.host.toByteArray().size + val addressSize = sender.address.toByteArray().size + val channelSize = channel.toByteArray().size + val totalLength = body.size + channelSize + hostSize + addressSize + 8 + 8 + 4 + 4 + 4 + 4 + 8 + val buffer = ByteBuffer.allocate(totalLength) + buffer.putInt(op) // op: 4 bytes + buffer.putInt(body.size) // body size: 4 bytes + buffer.put(body) // body data + buffer.putInt(channelSize) // channel size: 4 bytes + buffer.put(channel.toByteArray()) // channel data + buffer.putInt(hostSize) // host size: 4 bytes + buffer.put(sender.host.toByteArray()) // host data + buffer.putInt(sender.port) // port: 4 bytes + buffer.putInt(addressSize) // address size: 4 bytes + buffer.put(sender.address.toByteArray()) // address data + buffer.putLong(sender.uuid.mostSignificantBits) // UUID most significant bits: 8 bytes + buffer.putLong(sender.uuid.leastSignificantBits) // UUID least significant bits: 8 bytes + return buffer.array() + } + + companion object { + fun fromByteArray(bytes: ByteArray): OutboundMessageBytesPacket { + val buffer = ByteBuffer.wrap(bytes) + val op = buffer.getInt() + val bodySize = buffer.getInt() + val body = ByteArray(bodySize) + buffer.get(body) + val channelSize = buffer.getInt() + val channelBytes = ByteArray(channelSize) + buffer.get(channelBytes) + val channel = String(channelBytes) + val hostSize = buffer.getInt() + val hostBytes = ByteArray(hostSize) + buffer.get(hostBytes) + val host = String(hostBytes) + val port = buffer.getInt() + val addressSize = buffer.getInt() + val addressBytes = ByteArray(addressSize) + buffer.get(addressBytes) + val address = String(addressBytes) + val mostSignificantBits = buffer.getLong() + val leastSignificantBits = buffer.getLong() + val uuid = UUID(mostSignificantBits, leastSignificantBits) + val sender = Sender(host, port, address, uuid) + return OutboundMessageBytesPacket(op, body, channel, sender) + } + } + + + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (javaClass != other?.javaClass) return false + other as OutboundMessageBytesPacket + return op == other.op && + body.contentEquals(other.body) && + channel == other.channel && + sender == other.sender + } + + override fun hashCode(): Int { + var result = op + result = 31 * result + body.contentHashCode() + result = 31 * result + channel.hashCode() + result = 31 * result + (sender?.hashCode() ?: 0) + return result + } +} diff --git a/src/main/kotlin/cn/rtast/kwsify/entity/OutboundMessagePacket.kt b/src/main/kotlin/cn/rtast/kwsify/entity/OutboundMessagePacket.kt deleted file mode 100644 index 7a80da8..0000000 --- a/src/main/kotlin/cn/rtast/kwsify/entity/OutboundMessagePacket.kt +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright © 2024 RTAkland - * Author: RTAkland - * Date: 2024/11/30 - */ - - -package cn.rtast.kwsify.entity - -import java.util.UUID - -data class OutboundMessagePacket( - val op: Int, - val body: String, - val channel: String, - val sender: Sender? -) { - data class Sender( - val host: String, - val port: Int, - val address: String, - val uuid: UUID - ) -} \ No newline at end of file diff --git a/src/main/kotlin/cn/rtast/kwsify/entity/PublishPacket.kt b/src/main/kotlin/cn/rtast/kwsify/entity/PublishPacket.kt index a327fa0..7501442 100644 --- a/src/main/kotlin/cn/rtast/kwsify/entity/PublishPacket.kt +++ b/src/main/kotlin/cn/rtast/kwsify/entity/PublishPacket.kt @@ -7,8 +7,58 @@ package cn.rtast.kwsify.entity +import java.nio.ByteBuffer + data class PublishPacket( val op: Int, - val body: String, + val body: ByteArray, val channel: String -) \ No newline at end of file +) { + fun toByteArray(): ByteArray { + val totalLength = 4 + 4 + body.size + 4 + channel.toByteArray().size + val buffer = ByteBuffer.allocate(totalLength).apply { + putInt(op) + putInt(channel.toByteArray().size) + put(channel.toByteArray()) + putInt(body.size) + put(body) + } + return buffer.array() + } + + companion object { + fun fromByteArray(buffer: ByteBuffer): PublishPacket { + val op = buffer.int + val channelSize = buffer.int + val channelBytes = ByteArray(channelSize).apply { + buffer.get(this) + } + val channel = String(channelBytes) + val bodySize = buffer.int + val body = ByteArray(bodySize).apply { + buffer.get(this) + } + return PublishPacket(op, body, channel) + } + } + + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (javaClass != other?.javaClass) return false + + other as PublishPacket + + if (op != other.op) return false + if (!body.contentEquals(other.body)) return false + if (channel != other.channel) return false + + return true + } + + override fun hashCode(): Int { + var result = op + result = 31 * result + body.contentHashCode() + result = 31 * result + channel.hashCode() + return result + } +} \ No newline at end of file diff --git a/src/main/kotlin/cn/rtast/kwsify/entity/SubscribePacket.kt b/src/main/kotlin/cn/rtast/kwsify/entity/SubscribePacket.kt index a5e5579..8c484bf 100644 --- a/src/main/kotlin/cn/rtast/kwsify/entity/SubscribePacket.kt +++ b/src/main/kotlin/cn/rtast/kwsify/entity/SubscribePacket.kt @@ -7,11 +7,42 @@ package cn.rtast.kwsify.entity -import java.util.UUID +import cn.rtast.kwsify.util.getBoolean +import cn.rtast.kwsify.util.putBoolean +import java.nio.ByteBuffer +import java.util.* data class SubscribePacket( val op: Int, val uuid: UUID, val channel: String, val broadcastSelf: Boolean -) \ No newline at end of file +) { + fun toByteArray(): ByteArray { + val totalLength = 4 + 16 + 4 + channel.toByteArray().size + 1 + val buffer = ByteBuffer.allocate(totalLength).apply { + putInt(op) + putInt(channel.toByteArray().size) + put(channel.toByteArray()) + putBoolean(broadcastSelf) + putLong(uuid.mostSignificantBits) + putLong(uuid.leastSignificantBits) + } + return buffer.array() + } + + companion object { + fun fromByteArray(buffer: ByteBuffer): SubscribePacket { + val op = buffer.getInt() + val channelSize = buffer.getInt() + val channelBytes = ByteArray(channelSize) + buffer.get(channelBytes) + val channel = String(channelBytes) + val broadcastSelf = buffer.getBoolean() + val mostSignificantBits = buffer.getLong() + val leastSignificantBits = buffer.getLong() + val uuid = UUID(mostSignificantBits, leastSignificantBits) + return SubscribePacket(op, uuid, channel, broadcastSelf) + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/cn/rtast/kwsify/entity/UnsubscribePacket.kt b/src/main/kotlin/cn/rtast/kwsify/entity/UnsubscribePacket.kt index 95c8221..4d03a04 100644 --- a/src/main/kotlin/cn/rtast/kwsify/entity/UnsubscribePacket.kt +++ b/src/main/kotlin/cn/rtast/kwsify/entity/UnsubscribePacket.kt @@ -7,7 +7,22 @@ package cn.rtast.kwsify.entity +import java.nio.ByteBuffer + data class UnsubscribePacket( val op: Int, val channel: String, -) \ No newline at end of file +) { + + fun toByteArray(): ByteArray { + val totalLength = 4 + 4 + channel.toByteArray().size + val buffer = ByteBuffer.allocate(totalLength).apply { + putInt(op) + putInt(channel.toByteArray().size) + put(channel.toByteArray()) + } + return buffer.array() + } + + companion object {} +} \ No newline at end of file diff --git a/src/main/kotlin/cn/rtast/kwsify/util/Binary.kt b/src/main/kotlin/cn/rtast/kwsify/util/Binary.kt new file mode 100644 index 0000000..ab33ec5 --- /dev/null +++ b/src/main/kotlin/cn/rtast/kwsify/util/Binary.kt @@ -0,0 +1,18 @@ +/* + * Copyright © 2025 RTAkland + * Author: RTAkland + * Date: 2025/1/3 + */ + + +package cn.rtast.kwsify.util + +import java.nio.ByteBuffer + +fun ByteBuffer.putBoolean(value: Boolean) { + put(if (value) 1.toByte() else 0.toByte()) +} + +fun ByteBuffer.getBoolean(): Boolean { + return get() == 1.toByte() +} \ No newline at end of file diff --git a/src/main/kotlin/cn/rtast/kwsify/util/Websocket.kt b/src/main/kotlin/cn/rtast/kwsify/util/Websocket.kt deleted file mode 100644 index 22b6baa..0000000 --- a/src/main/kotlin/cn/rtast/kwsify/util/Websocket.kt +++ /dev/null @@ -1,14 +0,0 @@ -/* - * Copyright © 2024 RTAkland - * Author: RTAkland - * Date: 2024/11/30 - */ - - -package cn.rtast.kwsify.util - -import org.java_websocket.WebSocket - -fun WebSocket.send(payload: Any) { - this.send(payload.toJson()) -} \ No newline at end of file diff --git a/src/main/kotlin/cn/rtast/kwsify/util/WebsocketServer.kt b/src/main/kotlin/cn/rtast/kwsify/util/WebsocketServer.kt index 68343c2..fb87530 100644 --- a/src/main/kotlin/cn/rtast/kwsify/util/WebsocketServer.kt +++ b/src/main/kotlin/cn/rtast/kwsify/util/WebsocketServer.kt @@ -8,39 +8,51 @@ package cn.rtast.kwsify.util import cn.rtast.kwsify.entity.ConnectionState -import cn.rtast.kwsify.entity.OutboundMessagePacket +import cn.rtast.kwsify.entity.OPCodePacket +import cn.rtast.kwsify.entity.OutboundMessageBytesPacket +import cn.rtast.kwsify.entity.PublishPacket import cn.rtast.kwsify.entity.SubscribePacket import cn.rtast.kwsify.enums.OPCode import org.java_websocket.WebSocket import org.java_websocket.handshake.ClientHandshake import org.java_websocket.server.WebSocketServer import java.net.InetSocketAddress +import java.nio.ByteBuffer +import java.util.* class WebsocketServer(private val port: Int) : WebSocketServer(InetSocketAddress(port)) { private val connectionState = mutableListOf() - private fun getSender(conn: WebSocket): OutboundMessagePacket.Sender { + private fun getSender(conn: WebSocket): OutboundMessageBytesPacket.Sender { val state = connectionState.find { it.websocket == conn }!! val address = state.websocket.remoteSocketAddress.address.toString() val hostName = state.websocket.remoteSocketAddress.hostName val port = state.websocket.remoteSocketAddress.port - return OutboundMessagePacket.Sender(hostName, port, address, state.uuid) + return OutboundMessageBytesPacket.Sender(hostName, port, address, state.uuid) + } + + private fun getSystemSender(): OutboundMessageBytesPacket.Sender { + return OutboundMessageBytesPacket.Sender("system", 0, "system", UUID.randomUUID()) } private fun nullChannelPacket(conn: WebSocket) { val nullChannelPacket = - OutboundMessagePacket(OPCode.SYSTEM, "channel must not be null!", "_system", sender = this.getSender(conn)) + OutboundMessageBytesPacket( + OPCode.SYSTEM, + "channel must not be null!".toByteArray(), + "_system", + sender = this.getSender(conn) + ).toByteArray() conn.send(nullChannelPacket) } override fun onOpen(conn: WebSocket, handshake: ClientHandshake) { - val welcomePacket = OutboundMessagePacket( + val welcomePacket = OutboundMessageBytesPacket( OPCode.SYSTEM, - "send `op=3`, fill channel filed and uuid filed to join channel", - "_system", - null - ) + "send `op=3`, fill channel filed and uuid filed to join channel".toByteArray(), + "_system", getSystemSender() + ).toByteArray() conn.send(welcomePacket) println("New connection connected(${conn.remoteSocketAddress})(Unauthenticated)") } @@ -51,24 +63,27 @@ class WebsocketServer(private val port: Int) : WebSocketServer(InetSocketAddress } override fun onMessage(conn: WebSocket, message: String) { + } + + override fun onMessage(conn: WebSocket, message: ByteBuffer) { try { - val packet = message.fromJson() + val packet = OPCodePacket.fromByteArray(message.duplicate()) val channel = packet.channel when (packet.op) { OPCode.JOIN -> { - val authPacket = message.fromJson() + val authPacket = SubscribePacket.fromByteArray(message.duplicate()) if (!connectionState.any { it.websocket == conn }) { connectionState.add( ConnectionState(authPacket.channel, conn, authPacket.broadcastSelf, authPacket.uuid) ) } else { val systemPacket = - OutboundMessagePacket( + OutboundMessageBytesPacket( OPCode.SYSTEM, - "You already joined the channel ($channel)", + "You already joined the channel ($channel)".toByteArray(), "_system", sender = this.getSender(conn) - ) + ).toByteArray() conn.send(systemPacket) } println("New authed connection joined the channel(${authPacket.channel}) with UUID(${authPacket.uuid})") @@ -81,14 +96,15 @@ class WebsocketServer(private val port: Int) : WebSocketServer(InetSocketAddress } OPCode.PUBLISH -> { + val packet = PublishPacket.fromByteArray(message.duplicate()) connectionState.forEach { if (it.websocket != conn || it.broadcastSelf) { if (it.channel == channel) { val publishMessagePacket = - OutboundMessagePacket( + OutboundMessageBytesPacket( OPCode.MESSAGE, packet.body, channel, sender = this.getSender(conn) - ) + ).toByteArray() it.websocket.send(publishMessagePacket) } } @@ -96,14 +112,22 @@ class WebsocketServer(private val port: Int) : WebSocketServer(InetSocketAddress } OPCode.EXIT_CHANNEL -> { + val connection = connectionState.find { it.websocket == conn }!! if (connectionState.any { it.websocket == conn }) connectionState.removeIf { it.websocket == conn } else nullChannelPacket(conn) + println("Connection unsubscribed(${conn.remoteSocketAddress}, ${connection.uuid}: ${connection.channel})") } } } catch (e: Exception) { e.printStackTrace() - val outboundPacket = OutboundMessagePacket(OPCode.SYSTEM, e.message.toString(), "_system", null) + val outboundPacket = + OutboundMessageBytesPacket( + OPCode.SYSTEM, + e.message.toString().toByteArray(), + "_system", + getSystemSender() + ).toByteArray() conn.send(outboundPacket) } }