Skip to content
This repository has been archived by the owner on Oct 5, 2023. It is now read-only.

Commit

Permalink
Add DM support
Browse files Browse the repository at this point in the history
  • Loading branch information
Slash Nephy committed Sep 2, 2018
1 parent be989eb commit 3e145d4
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 2 deletions.
2 changes: 2 additions & 0 deletions src/main/kotlin/jp/nephy/tweetstorm/Config.kt
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ class Config(override val json: JsonObject): JsonModel {

val syncListFollowing by json.byBool("sync_list_following") { false }
val syncListIncludeSelf by json.byBool("sync_list_include_self") { true }
val enableDirectMessage by json.byBool("enable_direct_message") { true }
val enableFriends by json.byBool("enable_friends") { true }
val markVia by json.byBool("mark_via") { false }
val markVote by json.byBool("mark_vote") { false }
val listInterval by json.byInt("list_timeline_refresh_sec") { 3 }
val homeInterval by json.byInt("home_timeline_refresh_sec") { 90 }
val userInterval by json.byInt("user_timeline_refresh_sec") { 3 }
val mentionInterval by json.byInt("mention_timeline_refresh_sec") { 45 }
val messageInterval by json.byInt("direct_message_refresh_sec") { 90 }
}
}
4 changes: 3 additions & 1 deletion src/main/kotlin/jp/nephy/tweetstorm/TaskManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ class TaskManager(initialStream: AuthenticatedStream): Closeable {
it.add(HomeTimeline(this))
}

// it.add(DirectMessage(this))
if (account.enableDirectMessage) {
it.add(DirectMessage(this))
}
// it.add(Activity(this))
// it.add(Delete(this))
it.add(Heartbeat(this))
Expand Down
4 changes: 4 additions & 0 deletions src/main/kotlin/jp/nephy/tweetstorm/builder/CustomBuilders.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,7 @@ fun newUser(builder: CustomUserBuilder.() -> Unit): User {
fun newUserEvent(type: UserEventType, builder: CustomUserEventBuilder.() -> Unit): UserStreamUserEvent {
return CustomUserEventBuilder(type).apply(builder).build()
}

fun newDirectMessage(builder: CustomDirectMessageBuilder.() -> Unit): DirectMessage {
return CustomDirectMessageBuilder().apply(builder).build()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package jp.nephy.tweetstorm.builder

import jp.nephy.jsonkt.jsonObject
import jp.nephy.jsonkt.set
import jp.nephy.penicillin.models.DirectMessage
import java.util.*

class CustomDirectMessageBuilder: JsonBuilder<DirectMessage> {
override val json = jsonObject(
"created_at" to null,
"entities" to jsonObject(),
"id" to null,
"id_str" to null,
"read" to false,
"recipient" to null,
"recipient_id" to null,
"recipient_id_str" to null,
"sender" to null,
"sender_id" to null,
"sender_id_str" to null,
"sender_screen_name" to null,
"text" to null
)

private var createdAt: Date? = null
fun createdAt(date: Date? = null) {
createdAt = date
}

fun read() {
json["read"] = true
}

private val recipientBuilder = CustomUserBuilder()
fun recipient(builder: CustomUserBuilder.() -> Unit) {
recipientBuilder.apply(builder)
}

private val senderBuilder = CustomUserBuilder()
fun sender(builder: CustomUserBuilder.() -> Unit) {
senderBuilder.apply(builder)
}

fun text(text: () -> Any?) {
json["text"] = text()?.toString().orEmpty()
}

override fun build(): DirectMessage {
json["created_at"] = createdAt.toCreatedAt()

val id = generateId()
json["id"] = id
json["id_str"] = id.toString()

val recipient = recipientBuilder.build()
json["recipient"] = recipient.json
json["recipient_id"] = recipient.id
json["recipient_id_str"] = recipient.idStr

val sender = senderBuilder.build()
json["sender"] = sender.json
json["sender_id"] = sender.id
json["sender_id_str"] = sender.idStr
json["sender_screen_name"] = sender.screenName

return DirectMessage(json)
}
}
51 changes: 50 additions & 1 deletion src/main/kotlin/jp/nephy/tweetstorm/task/DirectMessage.kt
Original file line number Diff line number Diff line change
@@ -1,9 +1,58 @@
package jp.nephy.tweetstorm.task

import jp.nephy.jsonkt.set
import jp.nephy.penicillin.core.PenicillinException
import jp.nephy.penicillin.core.TwitterErrorMessage
import jp.nephy.tweetstorm.TaskManager
import jp.nephy.tweetstorm.builder.newDirectMessage
import java.time.Duration
import java.time.Instant
import java.util.concurrent.TimeUnit

class DirectMessage(override val manager: TaskManager): FetchTask() {
private val sleepSec = manager.account.messageInterval.toLong()

private var lastId: Long? = null
override fun run() {
// TODO
try {
val messages = manager.twitter.directMessageEvent.list(count = 200).complete()
if (messages.result.events.isNotEmpty()) {
if (lastId != null) {
messages.result.events.filter { it.type == "message_create" }.filter { it.id.toLong() > lastId!! }.reversed().forEach {
manager.emit(newDirectMessage {
recipient {
json["id"] = it.messageCreate.target.recipientId.toLong()
}
sender {
json["id"] = it.messageCreate.senderId.toLong()
}
text { it.messageCreate.messageData.text }
json["entities"] = it.messageCreate.messageData.entities.json
})
}
}

lastId = messages.result.events.first().id.toLong()
}

if (messages.headers.rateLimit.hasLimit) {
val duration = Duration.between(Instant.now(), messages.headers.rateLimit.resetAt!!.toInstant())
if (messages.headers.rateLimit.remaining!! < 2) {
streamLogger.warn { "Rate limit: Mostly exceeded. Sleep ${duration.seconds} secs. (Reset at ${messages.headers.rateLimit.resetAt})" }
TimeUnit.SECONDS.sleep(duration.seconds)
} else if (duration.seconds > 3 && messages.headers.rateLimit.remaining!! * sleepSec.toDouble() / duration.seconds < 1) {
streamLogger.warn { "Rate limit: API calls (/${messages.request.url}) seem to be frequent than expected so consider adjusting `*_messages_refresh_sec` value in config.json. Sleep 10 secs. (${messages.headers.rateLimit.remaining}/${messages.headers.rateLimit.limit}, Reset at ${messages.headers.rateLimit.resetAt})" }
TimeUnit.SECONDS.sleep(10)
}
}
} catch (e: Exception) {
if (e is PenicillinException && e.error == TwitterErrorMessage.RateLimitExceeded) {
TimeUnit.SECONDS.sleep(10)
} else {
logger.error(e) { "An error occurred while getting direct messages." }
}
} finally {
TimeUnit.SECONDS.sleep(sleepSec)
}
}
}

0 comments on commit 3e145d4

Please sign in to comment.