Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Oauth2 support #861

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
12 changes: 12 additions & 0 deletions shard.lock
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,22 @@ shards:
git: https://github.com/cloudamqp/amqp-client.cr.git
version: 1.2.8

bindata:
git: https://github.com/spider-gazelle/bindata.git
version: 2.1.0

jwt:
git: https://github.com/crystal-community/jwt.git
version: 1.6.1

lz4:
git: https://github.com/84codes/lz4.cr.git
version: 1.0.0+git.commit.96d714f7593c66ca7425872fd26c7b1286806d3d

openssl_ext:
git: https://github.com/spider-gazelle/openssl_ext.git
version: 2.4.4

systemd:
git: https://github.com/84codes/systemd.cr.git
version: 2.0.0
Expand Down
2 changes: 2 additions & 0 deletions shard.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ dependencies:
github: 84codes/systemd.cr
lz4:
github: 84codes/lz4.cr
jwt:
github: crystal-community/jwt

development_dependencies:
ameba:
Expand Down
6 changes: 6 additions & 0 deletions spec/auth_sepc.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
require "./spec_helper"

describe LavinMQ::AuthHandler do


end
36 changes: 36 additions & 0 deletions spec/cache_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# require "./spec_helper"

# describe LavinMQ::Cache do
# cache = LavinMQ::Cache(String, String).new(1.seconds)

# it "set key" do
# cache.set("key1", "allow").should eq "allow"
# end

# it "get key" do
# cache.set("keyget", "deny")
# cache.get?("keyget").should eq "deny"
# end

# it "invalid cache after 10 second" do
# cache.set("keyinvalid", "expired")
# sleep(2.seconds)
# cache.get?("keyinvalid").should be_nil
# end

# it "delete key" do
# cache.set("keydelete", "deleted")
# cache.delete("keydelete")
# cache.get?("keydelete").should be_nil
# end

# it "cleanup expired entry" do
# cache.set("clean1", "expired1")
# cache.set("clean2", "expired2")
# cache.set("clean3", "valid", 10.seconds)
# sleep(2.seconds)
# cache.get?("clean1").should be_nil
# cache.get?("clean2").should be_nil
# cache.get?("clean3").should eq "valid"
# end
# end
20 changes: 8 additions & 12 deletions src/lavinmq/amqp/connection_factory.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,23 @@ require "../version"
require "../logger"
require "./client"
require "../client/connection_factory"
require "../auth/handlers/basic_auth"
require "../auth/handlers/oauth2"
require "../auth/handlers/http"

module LavinMQ
module AMQP
class ConnectionFactory < LavinMQ::ConnectionFactory
Log = LavinMQ::Log.for "amqp.connection_factory"

def start(socket, connection_info, vhosts, users) : Client?
def start(socket, connection_info, vhosts, users, auth_chain) : Client?
remote_address = connection_info.src
socket.read_timeout = 15.seconds
metadata = ::Log::Metadata.build({address: remote_address.to_s})
logger = Logger.new(Log, metadata)
if confirm_header(socket, logger)
if start_ok = start(socket, logger)
if user = authenticate(socket, remote_address, users, start_ok, logger)
if user = authenticate(socket, remote_address, users, start_ok, logger, auth_chain)
if tune_ok = tune(socket, logger)
if vhost = open(socket, vhosts, user, logger)
socket.read_timeout = heartbeat_timeout(tune_ok)
Expand Down Expand Up @@ -47,7 +50,7 @@ module LavinMQ
elsif proto != AMQP::PROTOCOL_START_0_9_1 && proto != AMQP::PROTOCOL_START_0_9
socket.write AMQP::PROTOCOL_START_0_9_1.to_slice
socket.flush
log.warn { "Unexpected protocol #{String.new(proto.to_unsafe, count).inspect}, closing socket" }
log.warn { "Unexpected protocol '#{String.new(proto.to_slice)}', closing socket" }
false
else
true
Expand Down Expand Up @@ -100,17 +103,10 @@ module LavinMQ
end
end

def authenticate(socket, remote_address, users, start_ok, log)
def authenticate(socket, remote_address, users, start_ok, log, auth_chain)
username, password = credentials(start_ok)
user = users[username]?
return user if user && user.password && user.password.not_nil!.verify(password) &&
guest_only_loopback?(remote_address, user)

if user.nil?
log.warn { "User \"#{username}\" not found" }
else
log.warn { "Authentication failure for user \"#{username}\"" }
end
return user if user && auth_chain.authenticate(username, password) && guest_only_loopback?(remote_address, user)
props = start_ok.client_properties
if capabilities = props["capabilities"]?.try &.as?(AMQP::Table)
if capabilities["authentication_failure_close"]?.try &.as?(Bool)
Expand Down
66 changes: 66 additions & 0 deletions src/lavinmq/auth/auth_cache.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# module LavinMQ
# class CacheEntry(T)
# getter value : T
# getter expires_at : Time

# def initialize(@value : T, ttl : Time::Span)
# @expires_at = Time.utc + ttl
# end

# def expired? : Bool
# Time.utc > @expires_at
# end
# end

# class Cache(K, V)
# def initialize(@default_ttl : Time::Span = 1.hour)
# @mutex = Mutex.new
# @data = Hash(K, CacheEntry(V)).new
# end

# def set(key : K, value : V, ttl : Time::Span = @default_ttl) : V
# @mutex.synchronize do
# @data[key] = CacheEntry.new(value, ttl)
# value
# end
# end

# def get?(key : K) : V?
# @mutex.synchronize do
# entry = @data[key]?
# return nil unless entry

# if entry.expired?
# @data.delete(key)
# nil
# else
# entry.value
# end
# end
# end

# def delete(key : K) : Bool
# @mutex.synchronize do
# @data.delete(key) ? true : false
# end
# end

# def cleanup
# @mutex.synchronize do
# @data.reject! { |_, entry| entry.expired? }
# end
# end

# def clear
# @mutex.synchronize do
# @data.clear
# end
# end

# def size : Int32
# @mutex.synchronize do
# @data.size
# end
# end
# end
# end
49 changes: 49 additions & 0 deletions src/lavinmq/auth/auth_chain.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
require "./auth_cache"

module LavinMQ
class AuthChain
@first_handler : AuthHandler?

def initialize(users : UserStore)
backends = Config.instance.auth_backends
if backends.nil? || backends.size == 0
add_handler(BasicAuthHandler.new(users))
else
# TODO: gather config for http and oauth and send into handlers
backends.each do |backend|
case backend
when "oauth"
add_handler(OAuth2Handler.new(users))
when "http"
add_handler(HTTPAuthHandler.new(users))
when "basic"
add_handler(BasicAuthHandler.new(users))
else
raise "Unsupported authentication backend: #{backend}"
end
end
end
end

def add_handler(handler : AuthHandler)
if first = @first_handler
current = first
while next_handler = current.@successor
current = next_handler
end
current.set_successor(handler)
else
@first_handler = handler
end
self
end

def authenticate(username : String, password : String)
# TODO: Cache the authorized users, and call authenticate from cache class
# if authorized = @auth_cache.get?(username)
# return authorized
# end
@first_handler.try &.authenticate(username, password)
end
end
end
22 changes: 22 additions & 0 deletions src/lavinmq/auth/auth_handler.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
module LavinMQ
abstract class AuthHandler
Log = LavinMQ::Log.for "auth.handler"
property successor : AuthHandler?
@log = Logger.new(Log)

abstract def authenticate(username : String, password : String)

def set_successor(service : AuthHandler) : AuthHandler
@successor = service
service
end

def try_next(username : String, password : String)
if successor = @successor
successor.authenticate(username, password)
else
nil
end
end
end
end
16 changes: 16 additions & 0 deletions src/lavinmq/auth/handlers/basic_auth.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
require "../auth_handler"
require "../../server"

module LavinMQ
class BasicAuthHandler < LavinMQ::AuthHandler
def initialize(@users : UserStore)
end

def authenticate(username : String, password : String)
user = @users[username]
return user if user && user.password && user.password.not_nil!.verify(password)
@log.warn { "Basic authentication failed" }
try_next(username, password)
end
end
end
21 changes: 21 additions & 0 deletions src/lavinmq/auth/handlers/http.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
require "http/client"
require "json"
require "../auth_handler"

module LavinMQ
class HTTPAuthHandler < AuthHandler
def initialize(@users : UserStore)
end

def authenticate(username : String, password : String)
# TODO: implement the HTTP authentication logic and permissions parser here
if password.starts_with?("http")
@log.warn { "HTTP authentication successful" }
return @users[username]
else
@log.warn { "HTTP authentication failed" }
return try_next(username, password)
end
end
end
end
47 changes: 47 additions & 0 deletions src/lavinmq/auth/handlers/oauth2.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
require "../auth_handler"
require "jwt"
require "../../config"
require "http/client"


module LavinMQ
class OAuth2Handler < LavinMQ::AuthHandler
def initialize(@users : UserStore)
end

# Temporary for tests
@token : String = LavinMQ::Config.instance.token
@public_key : String = LavinMQ::Config.instance.public_key


def authenticate(username : String, password : String)
begin
fetch_jwks_token
payload, header = JWT.decode(@token, key: @public_key, algorithm: JWT::Algorithm::RS256, verify: true)

pp payload
pp header
oauth_user
rescue ex : JWT::DecodeError
@log.warn { "OAuth2 authentication failed, could not decode token: #{ex}" }
try_next(username, password)
rescue ex : JWT::UnsupportedAlgorithmError
@log.warn { "OAuth2 authentication failed, unsupported algortihm: #{ex}" }
try_next(username, password)
rescue ex
@log.warn { "OAuth2 authentication failed: #{ex}" }
try_next(username, password)
end
end

private def fetch_jwks_token
end

def oauth_user
# Discuss ow to do this?
# TODO: Create a uset that will be deleted when it disconnects, but also cannot be authorised with basic auth.
# introduce the needed configs for validation, and parse the payload to get the user details
user = @users.create("oauth_user", "password")
end
end
end
Loading