Skip to content

Commit

Permalink
use DirectDispatcher instead of Fiber.yield
Browse files Browse the repository at this point in the history
  • Loading branch information
viktorerlingsson committed Jan 9, 2024
1 parent 9f91e7b commit c13a0c7
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 11 deletions.
2 changes: 1 addition & 1 deletion src/lavinmq/launcher.cr
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ module LavinMQ
log_file = (path = @config.log_file) ? File.open(path, "a") : STDOUT
broadcast_backend = ::Log::BroadcastBackend.new
backend = if ENV.has_key?("JOURNAL_STREAM")
::Log::IOBackend.new(io: log_file, formatter: JournalLogFormat)
::Log::IOBackend.new(io: log_file, formatter: JournalLogFormat, dispatcher: ::Log::DirectDispatcher)
else
::Log::IOBackend.new(io: log_file, formatter: StdoutLogFormat)
end
Expand Down
19 changes: 9 additions & 10 deletions src/lavinmq/queue/message_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,12 @@ module LavinMQ
end

private def load_deleted_from_disk
count = 0
ack_files = Dir.each(@data_dir).count { |f| f.starts_with?("acks.") }
count = 0u32
ack_files = 0u32
Dir.each(@data_dir) do |f|
ack_files += 1 if f.starts_with? "acks."
end

Dir.each_child(@data_dir) do |child|
next unless child.starts_with? "acks."
seg = child[5, 10].to_u32
Expand All @@ -284,7 +288,7 @@ module LavinMQ
end
@replicator.try &.register_file(file)
end
log_progress_and_yield("Loading acks (#{count}/#{ack_files})") if (count += 1) % 128 == 0
Log.info { "Loading acks (#{count}/#{ack_files})" } if (count += 1) % 128 == 0
@deleted[seg] = acked.sort! unless acked.empty?
end
end
Expand Down Expand Up @@ -341,7 +345,7 @@ module LavinMQ
end
mfile.pos = 4
mfile.unmap # will be mmap on demand
log_progress_and_yield("Loading stats (#{counter}/#{@segments.size})") if (counter += 1) % 100 == 0
Log.info { "Loading stats (#{counter}/#{@segments.size})" } if (counter += 1) % 128 == 0
@segment_msg_count[seg] = count
end
end
Expand All @@ -357,7 +361,7 @@ module LavinMQ
next if seg == current_seg # don't the delete the segment still being written to

if @segment_msg_count[seg] == @acks[seg].size // sizeof(UInt32)
log_progress_and_yield("Deleting unused segment #{seg}")
Log.info { "Deleting unused segment #{seg}" }
@segment_msg_count.delete seg
@deleted.delete seg
if ack = @acks.delete(seg)
Expand All @@ -384,11 +388,6 @@ module LavinMQ
end
end

private def log_progress_and_yield(message)
Log.info { message }
Fiber.yield
end

class ClosedError < ::Channel::ClosedError; end

class Error < Exception
Expand Down

0 comments on commit c13a0c7

Please sign in to comment.