-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtest_pdc_client.rs
151 lines (125 loc) · 4.46 KB
/
test_pdc_client.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#![allow(unused)]
use arrow::array::{Array, Datum};
use arrow::ipc::reader::FileReader;
use bytes::Bytes;
use pmu::pdc_buffer_server;
use pmu::pdc_client::{ControlMessage, PDCClient};
use pmu::pdc_server::{run_mock_server, Protocol, ServerConfig};
use reqwest;
use std::io::Cursor;
use std::time::Duration;
use tokio::time;
#[tokio::test]
async fn test_pdc_client_server_communication() {
// Start mock server in background
println!("initializing server");
let server_config =
ServerConfig::new("127.0.0.1".to_string(), 4712, Protocol::TCP, 30.0).unwrap();
let server_handle = tokio::spawn(async move {
if let Err(e) = run_mock_server(server_config).await {
println!("Mock server error: {}", e)
};
});
// Give server time to start
println!("Waiting for server to start");
time::sleep(Duration::from_secs(1)).await;
// Create and start client
println!("Creating PDC Client...");
let (mut pdc_client, control_rx, mut data_rx) =
PDCClient::new("127.0.0.1", 4712, 1, Duration::from_secs(120))
.await
.expect("Failed to create PDC Client");
let control_tx = pdc_client.get_control_sender();
// Start client in background
println!("Starting stream.");
let client_handle = tokio::spawn(async move {
println!("PDC client stream started");
pdc_client.start_stream().await;
println!("PDC client stream ended");
});
// Test sequence
time::sleep(Duration::from_secs(2)).await;
// Request buffer
println!("Requesting buffer");
control_tx.send(ControlMessage::GetBuffer).await.unwrap();
// Check received data with timeout
match tokio::time::timeout(Duration::from_secs(3), data_rx.recv()).await {
Ok(Some(buffer)) => {
println!("Received buffer of size: {}", buffer.len());
assert!(!buffer.is_empty(), "Buffer should not be empty");
}
Ok(None) => panic!("Channel closed"),
Err(_) => panic!("Timeout waiting for buffer"),
}
// Stop client
println!("stopping client");
control_tx.send(ControlMessage::Stop);
// Clean up
client_handle.abort();
server_handle.abort();
}
#[tokio::test]
async fn test_buffer_server_data_endpoint() {
// Start mock PDC server
println!("Starting mock PDC server...");
let pdc_server_config =
ServerConfig::new("127.0.0.1".to_string(), 4712, Protocol::TCP, 30.0).unwrap();
let pdc_server_handle = tokio::spawn(async move {
if let Err(e) = run_mock_server(pdc_server_config).await {
println!("Mock server error: {}", e);
}
});
// Give PDC server time to start
time::sleep(Duration::from_secs(1)).await;
// Start buffer server
println!("Starting buffer server...");
std::env::set_var("PDC_HOST", "127.0.0.1");
std::env::set_var("PDC_PORT", "4712");
std::env::set_var("SERVER_PORT", "3000");
let buffer_server_handle = tokio::spawn(async {
if let Err(e) = pmu::pdc_buffer_server::run().await {
println!("Buffer server error: {}", e);
}
});
// Give buffer server time to start
time::sleep(Duration::from_secs(2)).await;
// Make request using reqwest
println!("Making request to buffer server...");
let client = reqwest::Client::new();
let response = client
.get("http://127.0.0.1:3000/data")
.send()
.await
.expect("Failed to get response");
assert!(response.status().is_success());
// Get response bytes
let arrow_buffer = response
.bytes()
.await
.expect("Failed to get response bytes")
.to_vec();
let cursor = Cursor::new(&arrow_buffer);
// Convert to Arrow RecordBatch
let reader = FileReader::try_new(cursor, None).expect("Failed to create Arrow reader");
// Print schema
println!("\nSchema:");
println!("{}", reader.schema());
// Print first few rows
println!("\nFirst few rows:");
for batch in reader {
let batch = batch.expect("Failed to read batch");
println!("Number of rows: {}", batch.num_rows());
for col_idx in 0..batch.num_columns() {
println!(
"{}: {:?} | ",
batch.schema().field(col_idx).name(),
batch.column(col_idx).slice(0, 5)
);
}
println!();
break; // Just print the first batch
}
// Clean up
buffer_server_handle.abort();
pdc_server_handle.abort();
}