Skip to content

Commit

Permalink
make stable
Browse files Browse the repository at this point in the history
  • Loading branch information
RTAkland committed Dec 1, 2024
1 parent dee97c7 commit 3477d88
Show file tree
Hide file tree
Showing 11 changed files with 108 additions and 64 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ jobs:
strategy:
matrix:
java:
- 11
- 8
os:
- ubuntu-latest
runs-on: ${{ matrix.os }}
Expand Down
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ repositories {

```kotlin
dependencies {
implementation("cn.rtast.kwsify:api:${version}") // 替换成最新版本
implementation("cn.rtast.kwsify:api:1.0.0") // 替换成最新版本
}
```

Expand All @@ -69,8 +69,9 @@ dependencies {
```kotlin
fun main() {
val wsify = Kwsify("ws://127.0.0.1:8989", "test")
wsify.subscribe(object : Subscriber {
override fun onMessage(channel: String, payload: String) {
// testChannel为需要订阅的频道, false表示是否接收自己publish的消息
wsify.subscribe("testChannel", false, object : Subscriber {
override fun onMessage(channel: String, payload: String, packet: Packet) {
println(payload)
}
})
Expand Down
12 changes: 6 additions & 6 deletions api/src/main/kotlin/cn/rtast/kwsify/IOperation.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,30 @@ interface IOperation {
/**
* 连接Websocket
*/
fun connect()
fun connect(channel: String, broadcastSelf: Boolean)

/**
* 发布消息
*/
fun publish(payload: String): Boolean
fun publish(channel: String, payload: String): Boolean

/**
* 订阅频道消息
*/
fun subscribe(subscriber: Subscriber): Boolean
fun subscribe(channel: String, broadcastSelf: Boolean, subscriber: Subscriber): Boolean

/**
* 取消订阅频道消息
*/
fun unsubscribe(): Boolean
fun unsubscribe(channel: String): Boolean

/**
* 发送订阅频道的数据包
*/
fun subscribePacket()
fun subscribePacket(channel: String, broadcastSelf: Boolean)

/**
* 发送取消订阅频道的数据包
*/
fun unsubscribePacket()
fun unsubscribePacket(channel: String)
}
44 changes: 24 additions & 20 deletions api/src/main/kotlin/cn/rtast/kwsify/Kwsify.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,34 +10,31 @@ package cn.rtast.kwsify
import cn.rtast.kwsify.entity.Packet
import cn.rtast.kwsify.enums.OPCode
import cn.rtast.kwsify.util.fromJson
import cn.rtast.kwsify.util.send
import org.java_websocket.client.WebSocketClient
import org.java_websocket.handshake.ServerHandshake
import java.net.URI

class Kwsify(private val address: String, private val channel: String) : IOperation {

init {
this.connect()
}

class Kwsify(private val address: String) : IOperation {
private lateinit var websocket: WebSocketClient
private val subscribers = mutableMapOf<String, MutableList<Subscriber>>()

override fun connect() {
override fun connect(channel: String, broadcastSelf: Boolean) {
websocket = object : WebSocketClient(URI(address)) {
override fun onOpen(handshakedata: ServerHandshake) {
subscribePacket()
subscribePacket(channel, broadcastSelf)
}

override fun onMessage(message: String) {
val inboundPacket = message.fromJson<Packet>()
val channel = inboundPacket.channel!!
subscribers[channel]?.forEach { subscriber ->
subscriber.onMessage(channel, inboundPacket.body)
subscriber.onMessage(channel, inboundPacket.body, inboundPacket)
}
}

override fun onClose(code: Int, reason: String?, remote: Boolean) {
unsubscribe(channel)
}

override fun onError(ex: Exception) {
Expand All @@ -46,37 +43,44 @@ class Kwsify(private val address: String, private val channel: String) : IOperat
}.apply { connect() }
}

override fun publish(payload: String): Boolean {
override fun publish(channel: String, payload: String): Boolean {
val packet = Packet(OPCode.PUBLISH, payload, channel)
subscribers[channel]?.forEach { subscriber ->
websocket.send(packet)
}
return true
}

override fun subscribe(subscriber: Subscriber): Boolean {
override fun subscribe(channel: String, broadcastSelf: Boolean, subscriber: Subscriber): Boolean {
this.connect(channel, broadcastSelf)
if (!subscribers.containsKey(channel)) {
subscribers[channel] = mutableListOf()
}
subscribers[channel]?.add(subscriber)
return true
}

override fun unsubscribe(): Boolean {
if (subscribers.containsKey(channel)) {
subscribers.remove(channel)
unsubscribePacket()
return true
override fun unsubscribe(channel: String): Boolean {
try {

if (subscribers.containsKey(channel)) {
subscribers.remove(channel)
unsubscribePacket(channel)
return true
}
return false
} catch (e: Exception) {
e.printStackTrace()
return false
}
return false
}

override fun subscribePacket() {
val packet = Packet(OPCode.JOIN, "_subscribe", channel)
override fun subscribePacket(channel: String, broadcastSelf: Boolean) {
val packet = Packet(OPCode.JOIN, "_subscribe", channel, broadcastSelf)
websocket.send(packet)
}

override fun unsubscribePacket() {
override fun unsubscribePacket(channel: String) {
val packet = Packet(OPCode.EXIT_CHANNEL, "_unsubscribe", channel)
websocket.send(packet)
}
Expand Down
4 changes: 3 additions & 1 deletion api/src/main/kotlin/cn/rtast/kwsify/Subscriber.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@

package cn.rtast.kwsify

import cn.rtast.kwsify.entity.Packet

interface Subscriber {
/**
* 接收到消息时
*/
fun onMessage(channel: String, payload: String)
fun onMessage(channel: String, payload: String, packet: Packet)
}
15 changes: 0 additions & 15 deletions api/src/main/kotlin/cn/rtast/kwsify/Websocket.kt

This file was deleted.

13 changes: 7 additions & 6 deletions api/src/test/kotlin/test/kwsify/TestClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,19 @@ package test.kwsify

import cn.rtast.kwsify.Kwsify
import cn.rtast.kwsify.Subscriber
import cn.rtast.kwsify.entity.Packet

fun main() {
val wsify = Kwsify("ws://127.0.0.1:8989", "test")
wsify.subscribe(object : Subscriber {
override fun onMessage(channel: String, payload: String) {
println(payload)
val wsify = Kwsify("ws://127.0.0.1:8080")
wsify.subscribe("test", true, object : Subscriber {
override fun onMessage(channel: String, payload: String, packet: Packet) {
println(packet.body)
}
})
Thread.sleep(2000)
while (true) {
wsify.publish("test")
wsify.publish("test", "114514")
Thread.sleep(1000)
}
wsify.unsubscribe()
wsify.unsubscribe("test")
}
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
kotlin.code.style=official
appVersion=1.0.0
libVersion=0.0.1
appVersion=1.1.0
libVersion=1.0.0
12 changes: 10 additions & 2 deletions src/main/kotlin/cn/rtast/kwsify/entity/Packet.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,13 @@ package cn.rtast.kwsify.entity
data class Packet(
val op: Int,
val body: String,
val channel: String?
)
val channel: String?,
val broadcastSelf: Boolean? = null,
val sender: Sender? = null
) {
data class Sender(
val host: String,
val port: Int,
val address: String
)
}
14 changes: 14 additions & 0 deletions src/main/kotlin/cn/rtast/kwsify/util/Websocket.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright © 2024 RTAkland
* Author: RTAkland
* Date: 2024/11/30
*/


package cn.rtast.kwsify.util

import org.java_websocket.WebSocket

fun WebSocket.send(payload: Any) {
this.send(payload.toJson())
}
45 changes: 37 additions & 8 deletions src/main/kotlin/cn/rtast/kwsify/util/WebsocketServer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,28 @@ import java.net.InetSocketAddress

class WebsocketServer(private val port: Int) : WebSocketServer(InetSocketAddress(port)) {

private val connectionState = mutableMapOf<WebSocket, String>()
private val connectionState = mutableMapOf<WebSocket, Map<String, Boolean>>()

private fun getSender(conn: WebSocket): Packet.Sender {
val address = conn.remoteSocketAddress.address.toString()
val hostName = conn.remoteSocketAddress.hostName
val port = conn.remoteSocketAddress.port
return Packet.Sender(hostName, port, address)
}

private fun nullChannelPacket(conn: WebSocket) {
val nullChannelPacket = Packet(OPCode.SYSTEM, "channel must not be null!", "_system").toJson()
val nullChannelPacket =
Packet(OPCode.SYSTEM, "channel must not be null!", "_system", sender = this.getSender(conn))
conn.send(nullChannelPacket)
}

override fun onOpen(conn: WebSocket, handshake: ClientHandshake) {
val welcomePacket = Packet(OPCode.SYSTEM, "send OPCode=3, set channel to join channel", "_system").toJson()
val welcomePacket = Packet(
OPCode.SYSTEM,
"send OPCode=3, set channel to join channel",
"_system",
sender = this.getSender(conn)
)
conn.send(welcomePacket)
}

Expand All @@ -35,15 +48,21 @@ class WebsocketServer(private val port: Int) : WebSocketServer(InetSocketAddress
override fun onMessage(conn: WebSocket, message: String) {
try {
val packet = message.fromJson<Packet>()
val broadcastSelf = packet.broadcastSelf == true
val channel = packet.channel
when (packet.op) {
OPCode.JOIN -> {
if (channel == null) nullChannelPacket(conn) else {
if (!connectionState.containsKey(conn)) {
connectionState[conn] = channel
connectionState[conn] = mapOf(channel to broadcastSelf)
} else {
val systemPacket =
Packet(OPCode.SYSTEM, "You already joined the channel ($channel)", "_system").toJson()
Packet(
OPCode.SYSTEM,
"You already joined the channel ($channel)",
"_system",
sender = this.getSender(conn)
)
conn.send(systemPacket)
}
}
Expand All @@ -56,9 +75,19 @@ class WebsocketServer(private val port: Int) : WebSocketServer(InetSocketAddress
}

OPCode.PUBLISH -> {
connectionState.filterKeys { it != conn }.filter { it.value == channel }.forEach {
val publishMessagePacket = Packet(OPCode.MESSAGE, packet.body, channel).toJson()
it.key.send(publishMessagePacket)
connectionState.forEach { (webSocket, channelMap) ->
channelMap.forEach { (channelKey, shouldBroadcast) ->
if (channelKey == channel) {
if (shouldBroadcast || webSocket != conn) {
val publishMessagePacket =
Packet(
OPCode.MESSAGE, packet.body, channel,
sender = this.getSender(conn)
)
webSocket.send(publishMessagePacket)
}
}
}
}
}

Expand Down

0 comments on commit 3477d88

Please sign in to comment.