Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter out expired tokens #163

Merged
merged 1 commit into from
Nov 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion rust/src/security/sasl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ impl SaslReader {

let mut bytes = buf.freeze();
let rpc_response = RpcResponseHeaderProto::decode_length_delimited(&mut bytes)?;
debug!("{:?}", rpc_response);
debug!("RPC response: {:?}", rpc_response);

match RpcStatusProto::try_from(rpc_response.status).unwrap() {
RpcStatusProto::Error => {
Expand Down Expand Up @@ -339,6 +339,8 @@ impl SaslWriter {
}

async fn send_sasl_message(&mut self, message: &RpcSaslProto) -> io::Result<()> {
debug!("Sending SASL message {:?}", message);

let header_buf = Self::create_request_header().encode_length_delimited_to_vec();
let message_buf = message.encode_length_delimited_to_vec();
let size = (header_buf.len() + message_buf.len()) as u32;
Expand Down
63 changes: 58 additions & 5 deletions rust/src/security/user.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use bytes::{Buf, Bytes};
use chrono::Utc;
use log::debug;
use prost::Message;
use std::env;
Expand All @@ -13,6 +14,7 @@ use crate::proto::common::TokenProto;
use crate::proto::hdfs::AccessModeProto;
use crate::proto::hdfs::BlockTokenSecretProto;
use crate::proto::hdfs::StorageTypeProto;
use crate::HdfsError;
use crate::Result;

const HADOOP_USER_NAME: &str = "HADOOP_USER_NAME";
Expand Down Expand Up @@ -123,6 +125,48 @@ impl BlockTokenIdentifier {
}
}

#[derive(Debug)]
#[allow(dead_code)]
struct TokenIdentifier {
owner: String,
renewer: String,
real_user: String,
issue_date: i64,
max_date: i64,
sequence_number: i32,
master_key_id: i32,
}

impl TryFrom<Vec<u8>> for TokenIdentifier {
type Error = HdfsError;

fn try_from(value: Vec<u8>) -> std::result::Result<Self, Self::Error> {
let mut buf = Bytes::from(value);
let version = buf.get_u8();
if version != 0 {
panic!();
}

let owner = parse_vint_string(&mut buf)?;
let renewer = parse_vint_string(&mut buf)?;
let real_user = parse_vint_string(&mut buf)?;
let issue_date = parse_vlong(&mut buf);
let max_date = parse_vlong(&mut buf);
let sequence_number = parse_vint(&mut buf);
let master_key_id = parse_vint(&mut buf);

Ok(TokenIdentifier {
owner,
renewer,
real_user,
issue_date,
max_date,
sequence_number,
master_key_id,
})
}
}

#[derive(Debug)]
#[allow(dead_code)]
pub struct Token {
Expand Down Expand Up @@ -320,7 +364,13 @@ impl User {
pub(crate) fn get_token(&self, kind: &str, service: &str) -> Option<&Token> {
self.tokens
.iter()
.find(|t| t.kind == kind && t.service == service)
.filter(|t| t.kind == kind && t.service == service)
.find(|t| {
// Ignore any tokens that are set to expire in the next 60 seconds
let token_identifier: TokenIdentifier = t.identifier.clone().try_into().unwrap();
debug!("Token Identifier: {:?}", token_identifier);
token_identifier.max_date > Utc::now().timestamp_millis() + 60000
})
}

pub(crate) fn get_user_info_from_principal(principal: &str) -> UserInfo {
Expand Down Expand Up @@ -385,7 +435,9 @@ mod tests {
assert_eq!(tokens.len(), 1);
assert_eq!(tokens[0].kind, "HDFS_DELEGATION_TOKEN");
assert_eq!(tokens[0].service, "127.0.0.1:9000");
tokens.iter().for_each(|t| println!("{:?}", t));

let token_identifier: TokenIdentifier = tokens[0].identifier.clone().try_into().unwrap();
assert_eq!(token_identifier.max_date, 1690672432660)
}

#[test]
Expand Down Expand Up @@ -413,11 +465,13 @@ mod tests {
assert_eq!(tokens.len(), 1);
assert_eq!(tokens[0].kind, "HDFS_DELEGATION_TOKEN");
assert_eq!(tokens[0].service, "127.0.0.1:9000");
tokens.iter().for_each(|t| println!("{:?}", t));

let token_identifier: TokenIdentifier = tokens[0].identifier.clone().try_into().unwrap();
assert_eq!(token_identifier.max_date, 1686955057021)
}

#[test]
fn test_load_token_identifier() {
fn test_load_block_token_identifier() {
let token = [
138u8, 1, 142, 89, 190, 30, 189, 140, 100, 197, 210, 104, 0, 0, 0, 4, 104, 100, 102,
115, 0, 0, 0, 40, 66, 80, 45, 57, 55, 51, 52, 55, 55, 51, 54, 48, 45, 49, 57, 50, 46,
Expand All @@ -433,7 +487,6 @@ mod tests {
];

let token_identifier = BlockTokenIdentifier::from_identifier(&token).unwrap();
println!("{:?}", token_identifier);
assert_eq!(token_identifier.user_id, "hdfs");
assert_eq!(
token_identifier.block_pool_id,
Expand Down
Loading