diff --git a/rust/crates/sift_cli/Cargo.toml b/rust/crates/sift_cli/Cargo.toml index 285563341..9e3483cdc 100644 --- a/rust/crates/sift_cli/Cargo.toml +++ b/rust/crates/sift_cli/Cargo.toml @@ -23,13 +23,24 @@ flate2 = "1.1.2" indicatif = "0.18.0" parquet = "56.2.0" pbjson-types = { workspace = true } +prost = "0.13.5" reqwest = "0.12.23" sift_pbfs = { workspace = true } sift_rs = { workspace = true } -tokio = { version = "1.47.1", features = ["full", "net", "time"] } +sift_stream = { workspace = true } +tokio = { version = "1.47.1", features = ["full", "net", "time", "macros", "rt-multi-thread", "signal"] } tokio-stream = "0.1.17" +tonic = { workspace = true } +tonic-reflection = "0.12" toml = "0.8.23" zip = "6.0.0" [dev-dependencies] indoc = "2.0.6" + +[dependencies.uuid] +version = "1.19.0" +features = ["v4"] + +[build-dependencies] +tonic-build = "0.12" diff --git a/rust/crates/sift_cli/src/cli/mod.rs b/rust/crates/sift_cli/src/cli/mod.rs index 0a879fb1d..1c6d5436c 100644 --- a/rust/crates/sift_cli/src/cli/mod.rs +++ b/rust/crates/sift_cli/src/cli/mod.rs @@ -48,6 +48,10 @@ pub enum Cmd { /// Import time series files into Sift #[command(subcommand)] Import(ImportCmd), + + /// Start a test gRPC server for streaming. + #[command(subcommand)] + TestServer(TestServerCmd), } #[derive(Subcommand)] @@ -164,6 +168,31 @@ pub enum ConfigCmd { Update(ConfigUpdateArgs), } +#[derive(Subcommand)] +pub enum TestServerCmd { + /// Start a test ingestion server. + Run(TestServerArgs), +} + +#[derive(clap::Args)] +pub struct TestServerArgs { + /// The address to serve gRPC server. + #[arg(short, long, default_value_t = String::from("0.0.0.0:50051"))] + pub local_address: String, + + /// Whether to stream metrics to Sift. + #[arg(short, long)] + pub stream_metrics: bool, + + /// The asset name to use when streaming server ingestion metrics. + #[arg(short, long)] + pub metrics_asset_name: Option, + + /// Include to use plain output. Use this option in scripts or when saving logs. + #[arg(short, long)] + pub plain_output: bool, +} + #[derive(clap::Args)] pub struct ConfigUpdateArgs { /// Edit or create a profile interactively (ignores other flags) diff --git a/rust/crates/sift_cli/src/cmd/import/csv.rs b/rust/crates/sift_cli/src/cmd/import/csv.rs index 627556efd..9e6161806 100644 --- a/rust/crates/sift_cli/src/cmd/import/csv.rs +++ b/rust/crates/sift_cli/src/cmd/import/csv.rs @@ -300,14 +300,14 @@ fn create_data_import_request( let mut enum_configs = Vec::new(); let mut bit_field_configs = Vec::new(); - if data_type == ChannelDataType::Enum.into() { + if data_type == ChannelDataType::Enum as i32 { let Some(configs) = enum_configs_iter.next() else { return Err(anyhow!( "'{name}' was declared as type enum but --enum-config was not specified" )); }; enum_configs = configs; - } else if data_type == ChannelDataType::BitField.into() { + } else if data_type == ChannelDataType::BitField as i32 { let Some(configs) = bit_field_configs_iter.next() else { return Err(anyhow!( "'{name}' was declared as type bit-field but --bit-field-config was not specified" diff --git a/rust/crates/sift_cli/src/cmd/mod.rs b/rust/crates/sift_cli/src/cmd/mod.rs index c508a4d6e..8b3a26e67 100644 --- a/rust/crates/sift_cli/src/cmd/mod.rs +++ b/rust/crates/sift_cli/src/cmd/mod.rs @@ -8,6 +8,7 @@ pub mod completions; pub mod config; pub mod export; pub mod import; +pub mod test_server; pub struct Context { pub grpc_uri: String, diff --git a/rust/crates/sift_cli/src/cmd/test_server/metrics_streaming_client.rs b/rust/crates/sift_cli/src/cmd/test_server/metrics_streaming_client.rs new file mode 100644 index 000000000..d3beef7c2 --- /dev/null +++ b/rust/crates/sift_cli/src/cmd/test_server/metrics_streaming_client.rs @@ -0,0 +1,127 @@ +use super::Context; +use anyhow::{Ok, anyhow}; +use crossterm::style::Stylize; +use sift_stream::{ + ChannelConfig, ChannelDataType, ChannelValue, Credentials, Flow, FlowConfig, + IngestionConfigEncoder, IngestionConfigForm, RecoveryStrategy, RetryPolicy, SiftStream, + SiftStreamBuilder, TimeValue, +}; + +/// Streams metrics to Sift. +pub struct MetricsStreamingClient { + ctx: Context, + asset_name: String, + sift_stream: Option>, +} + +impl MetricsStreamingClient { + pub fn build( + ctx: Context, + stream_metrics: &bool, + asset_name: &Option, + ) -> Result, anyhow::Error> { + if !stream_metrics { + return Ok(None); + } + + let Some(asset_name) = asset_name else { + return Err(anyhow!( + "must specify {} with streaming enabled", + "--metrics_asset_name".cyan() + )); + }; + + Ok(Some(MetricsStreamingClient { + ctx, + asset_name: asset_name.clone(), + sift_stream: None, + })) + } + + /// Initialize SiftStream and create ingestion config. + pub async fn initialize(&mut self) -> Result<(), anyhow::Error> { + let credentials = Credentials::Config { + apikey: self.ctx.api_key.clone(), + uri: self.ctx.grpc_uri.clone(), + }; + + let ingestion_config = IngestionConfigForm { + asset_name: self.asset_name.to_string(), + client_key: "stress-test-ingestion-config-test".into(), + flows: vec![FlowConfig { + name: "metrics".into(), + channels: vec![ + ChannelConfig { + name: "total_num_streams".into(), + description: "Total number of streams created".into(), + data_type: ChannelDataType::Uint32.into(), + ..Default::default() + }, + ChannelConfig { + name: "total_num_bytes_read".into(), + description: "Total number of bytes read".into(), + unit: "B".into(), + data_type: ChannelDataType::Uint64.into(), + ..Default::default() + }, + ChannelConfig { + name: "total_num_messages".into(), + description: "Total number of messages received".into(), + unit: "message".into(), + data_type: ChannelDataType::Uint64.into(), + ..Default::default() + }, + ChannelConfig { + name: "bytes_per_s".into(), + description: "Number of bytes received per second".into(), + data_type: ChannelDataType::Double.into(), + unit: "B/s".into(), + ..Default::default() + }, + ChannelConfig { + name: "messages_per_s".into(), + description: "Number of messages received per second".into(), + unit: "message/s".into(), + data_type: ChannelDataType::Double.into(), + ..Default::default() + }, + ], + }], + }; + + let sift_stream = SiftStreamBuilder::new(credentials) + .ingestion_config(ingestion_config) + .recovery_strategy(RecoveryStrategy::RetryOnly(RetryPolicy::default())) + .build() + .await?; + + self.sift_stream = Some(sift_stream); + + Ok(()) + } + + /// Send metrics to Sift. + pub async fn ingest(&mut self, metrics: Metrics) { + let flow = Flow::new( + "metrics", + TimeValue::now(), + &[ + ChannelValue::new("total_num_streams", metrics.total_num_streams), + ChannelValue::new("total_num_bytes_read", metrics.total_num_bytes_read), + ChannelValue::new("total_num_messages", metrics.total_num_messages), + ChannelValue::new("bytes_per_s", metrics.bytes_per_s), + ChannelValue::new("messages_per_s", metrics.messages_per_s), + ], + ); + + self.sift_stream.as_mut().unwrap().send(flow).await.unwrap(); + } +} + +pub struct Metrics { + pub total_num_streams: u32, + pub total_num_bytes_read: u64, + pub total_num_messages: u64, + pub bytes_per_s: f64, + pub messages_per_s: f64, +} diff --git a/rust/crates/sift_cli/src/cmd/test_server/mod.rs b/rust/crates/sift_cli/src/cmd/test_server/mod.rs new file mode 100644 index 000000000..810b5e024 --- /dev/null +++ b/rust/crates/sift_cli/src/cmd/test_server/mod.rs @@ -0,0 +1,115 @@ +use super::Context; +use crate::cmd::test_server::metrics_streaming_client::MetricsStreamingClient; +use crate::{cli::TestServerArgs, util::tty::Output}; +use anyhow::{Context as AnyhowContext, Result}; +use server::TestServer; +use sift_rs::assets::v1::asset_service_server::AssetServiceServer; +use sift_rs::ingest::v1::ingest_service_server::IngestServiceServer; +use sift_rs::ingestion_configs::v2::ingestion_config_service_server::IngestionConfigServiceServer; +use sift_rs::ping::v1::ping_service_server::PingServiceServer; +use std::process::ExitCode; +use std::sync::Arc; +use tokio::sync::mpsc; +use tokio::sync::watch; +use tonic::transport::Server; +use tonic_reflection::server::Builder; + +pub mod metrics_streaming_client; +pub mod server; +use crate::cmd::test_server::metrics_streaming_client::Metrics; + +pub async fn run(ctx: Context, args: TestServerArgs) -> Result { + let addr = args.local_address.parse().context(format!( + "failed to parse local_address: {}", + args.local_address + ))?; + + // Initialize streaming client. + let mut streaming_client = + MetricsStreamingClient::build(ctx, &args.stream_metrics, &args.metrics_asset_name) + .context("failed to create metrics streaming client")?; + + if let Some(client) = streaming_client.as_mut() { + client + .initialize() + .await + .context("failed to initialize streaming client")?; + } + + // Channel to signal program exit. + let (shutdown_tx, mut shutdown_rx) = watch::channel(false); + let mut shutdown_rx2 = shutdown_rx.clone(); + + // Channel to send metrics. + let (metrics_tx, mut metrics_rx) = mpsc::channel::(1024); + + // Initialize gRPC server. + let server = Arc::new(TestServer::default()); + + // Start task to calculate ingestion metrics. + let server_arc = Arc::clone(&server); + let calc_stats_task = tokio::spawn(async move { + server_arc + .calculate_metrics( + &mut shutdown_rx, + metrics_tx, + args.stream_metrics, + args.plain_output, + ) + .await + .context("calculate metrics task failed") + .unwrap(); + }); + + // Start task to ingest metrics to Sift. + let ingest_metrics_task = tokio::spawn(async move { + if let Some(client) = streaming_client.as_mut() { + loop { + tokio::select! { + _ = shutdown_rx2.changed() => { + Output::new().line("Ingest task shutting down").print(); + break; + } + Some(metrics) = metrics_rx.recv() => { + client.ingest(metrics).await; + } + }; + } + } + }); + + let reflection_service = Builder::configure() + .register_encoded_file_descriptor_set(sift_rs::assets::v1::FILE_DESCRIPTOR_SET) + .register_encoded_file_descriptor_set(sift_rs::ingest::v1::FILE_DESCRIPTOR_SET) + .register_encoded_file_descriptor_set(sift_rs::ingestion_configs::v2::FILE_DESCRIPTOR_SET) + .register_encoded_file_descriptor_set(sift_rs::ping::v1::FILE_DESCRIPTOR_SET) + .build_v1alpha() + .context("failed to create gRPC reflection service")?; + + Output::new() + .line(format!("Server listening on {addr}")) + .print(); + + Server::builder() + .add_service(reflection_service) + .add_service(PingServiceServer::from_arc(server.clone())) + .add_service(IngestServiceServer::from_arc(server.clone())) + .add_service(IngestionConfigServiceServer::from_arc(server.clone())) + .add_service(AssetServiceServer::from_arc(server.clone())) + .serve_with_shutdown(addr, async move { + tokio::signal::ctrl_c().await.unwrap(); + let _ = shutdown_tx.send(true); + }) + .await?; + + calc_stats_task + .await + .context("failed to await calculation task")?; + ingest_metrics_task + .await + .context("failed to await ingestion task")?; + + Output::new().line("Exiting.").print(); + + Ok(ExitCode::SUCCESS) +} diff --git a/rust/crates/sift_cli/src/cmd/test_server/server.rs b/rust/crates/sift_cli/src/cmd/test_server/server.rs new file mode 100644 index 000000000..0b6b889e1 --- /dev/null +++ b/rust/crates/sift_cli/src/cmd/test_server/server.rs @@ -0,0 +1,313 @@ +use crate::cmd::test_server::metrics_streaming_client::Metrics; +use crate::util::tty::Output; +use anyhow::{Context, Ok as AnyhowOk, Result as AnyhowResult}; +use crossterm::{ExecutableCommand, cursor, terminal}; +use prost::Message; +use sift_rs::assets::v1::{ + Asset, GetAssetRequest, GetAssetResponse, asset_service_server::AssetService, +}; +use sift_rs::ingest::v1::{ + IngestArbitraryProtobufDataStreamRequest, IngestArbitraryProtobufDataStreamResponse, + IngestWithConfigDataStreamRequest, IngestWithConfigDataStreamResponse, + ingest_service_server::IngestService, +}; +use sift_rs::ingestion_configs::v2::{ingestion_config_service_server::IngestionConfigService, *}; +use sift_rs::ping::v1::{PingRequest, PingResponse, ping_service_server::PingService}; +use std::io::stdout; +use std::sync::atomic::AtomicBool; +use std::{ + collections::HashMap, + sync::{ + Mutex, + atomic::{AtomicU32, AtomicU64, Ordering::Relaxed}, + }, +}; +use tokio::sync::mpsc::Sender; +use tokio::sync::watch; +use tokio::time::Duration; +use tokio_stream::StreamExt; +use tonic::{Request, Response, Status, Streaming}; +use uuid::Uuid; + +#[derive(Default)] +pub struct TestServer { + /// Whether the server is done processing streams. + done: AtomicBool, + + /// Total number of streams created. + total_num_streams: AtomicU32, + + /// Total number of messages received. + total_num_messages: AtomicU64, + + /// Total number of bytes received. + total_num_bytes_read: AtomicU64, + + // Total number of ingestion configs created. + total_num_ingestion_configs: AtomicU32, + + /// Ingestion configs by Ingestion Config ID. + ingestion_configs_by_id: Mutex>, + + /// Assets by Asset ID. + asset_ids_by_name: Mutex>, +} + +/// Ingested data and drops it. Calculates ingestion stats and optionally streams them to Sift. +impl TestServer { + /// Calculate ingestion metrics and optionally stream them to Sift. + pub async fn calculate_metrics( + &self, + shutdown: &mut watch::Receiver, + metrics_tx: Sender, + streaming_enabled: bool, + plain_output: bool, + ) -> AnyhowResult<()> { + let mut interval = tokio::time::interval(Duration::from_secs(1)); + let mut stdout = stdout(); + + let mut last_total_num_bytes_read: u64 = 0; + let mut last_total_num_messages: u64 = 0; + + loop { + tokio::select! { + _ = shutdown.changed() => { + self.done.fetch_or(true, Relaxed); + Output::new().line("Metrics task shutting down").print(); + return AnyhowOk(()); + } + + _ = interval.tick() => { + let current_total_num_bytes_read = self.total_num_bytes_read.load(Relaxed); + let current_total_num_messages = self.total_num_messages.load(Relaxed); + let current_total_num_streams = self.total_num_streams.load(Relaxed); + let bytes_per_s = current_total_num_bytes_read - last_total_num_bytes_read; + let messages_per_s = current_total_num_messages - last_total_num_messages; + + last_total_num_bytes_read = current_total_num_bytes_read; + last_total_num_messages = current_total_num_messages; + + if !plain_output { + // Clear terminal and print metrics. + stdout + .execute(terminal::Clear(terminal::ClearType::All)) + .context("failed to clear terminal")?; + stdout.execute(cursor::MoveTo(0, 0)) + .context("failed to move terminal cursor")?; + stdout.execute(cursor::MoveUp(5)) + .context("failed to move terminal cursor")?; + stdout.execute(terminal::Clear(terminal::ClearType::FromCursorDown)) + .context("failed to move terminal cursor")?; + } else { + Output::new().line(format!("-----")).print(); + Output::new().line(format!("{}", chrono::Local::now().to_rfc3339())).print(); + } + + Output::new().line(format!("Total num streams: {current_total_num_streams}")).print(); + Output::new().line(format!("Total num bytes: {current_total_num_bytes_read}")).print(); + Output::new().line(format!("Total num messages: {current_total_num_messages}")).print(); + Output::new().line(format!("bytes/s: {bytes_per_s}")).print(); + Output::new().line(format!("messages/s: {messages_per_s}")).print(); + + // Stream to Sift. + if streaming_enabled { + let e = metrics_tx.try_send(Metrics{ + total_num_streams: current_total_num_streams, + total_num_bytes_read: current_total_num_bytes_read, + total_num_messages: current_total_num_messages, + bytes_per_s: bytes_per_s as f64, + messages_per_s: messages_per_s as f64, + }); + + if e.is_err() { + Output::new().line(format!("{e:?}")); + } + } + } + } + } + } +} + +#[tonic::async_trait] +impl PingService for TestServer { + async fn ping(&self, _request: Request) -> Result, Status> { + Ok(Response::new(PingResponse::default())) + } +} + +#[tonic::async_trait] +impl AssetService for TestServer { + /// Returns an asset ID. + async fn get_asset( + &self, + request: Request, + ) -> Result, Status> { + let asset_ids_by_name = self.asset_ids_by_name.lock().unwrap(); + let inner = request.into_inner(); + + for (asset_id, asset_name) in asset_ids_by_name.iter() { + if inner.asset_id == *asset_id { + return Ok(Response::new(GetAssetResponse { + asset: Some(Asset { + asset_id: asset_id.into(), + name: asset_name.into(), + ..Default::default() + }), + })); + } + } + + Err(Status::not_found("asset not found")) + } + + /// No-op. + async fn delete_asset( + &self, + _request: Request, + ) -> Result, Status> { + unimplemented!() + } + + /// No-op. + async fn list_assets( + &self, + _request: Request, + ) -> Result, Status> { + Ok(Response::new( + sift_rs::assets::v1::ListAssetsResponse::default(), + )) + } + + /// No-op. + async fn update_asset( + &self, + _request: Request, + ) -> Result, Status> { + unimplemented!() + } + + /// No-op. + async fn archive_asset( + &self, + _request: Request, + ) -> Result, Status> { + unimplemented!() + } +} + +#[tonic::async_trait] +impl IngestionConfigService for TestServer { + /// Returns an arbitrary Ingestion Config with a new UUID. + async fn get_ingestion_config( + &self, + request: Request, + ) -> Result, Status> { + let inner = request.into_inner(); + let ingestion_configs = self.ingestion_configs_by_id.lock().unwrap(); + let ingestion_config = ingestion_configs + .get(&inner.ingestion_config_id) + .ok_or(Status::not_found("ingestion config not found"))?; + + Ok(Response::new(GetIngestionConfigResponse { + ingestion_config: Some(ingestion_config.clone()), + })) + } + + /// Returns an empty list of ingestion configs. + async fn list_ingestion_configs( + &self, + _request: Request, + ) -> Result, Status> { + let ingestion_configs = self.ingestion_configs_by_id.lock().unwrap(); + + let mut all_ingestion_configs: Vec = + Vec::with_capacity(ingestion_configs.len()); + + for ingestion_config in ingestion_configs.values() { + all_ingestion_configs.push(ingestion_config.clone()); + } + + Ok(Response::new(ListIngestionConfigsResponse { + ingestion_configs: all_ingestion_configs, + next_page_token: "".into(), + })) + } + + /// Returns an arbitrary Ingestion Config with a new UUID. + async fn create_ingestion_config( + &self, + request: Request, + ) -> Result, Status> { + self.total_num_ingestion_configs.fetch_add(1, Relaxed); + let inner = request.into_inner(); + + let mut assets = self.asset_ids_by_name.lock().unwrap(); + let default_asset_id = Uuid::new_v4().to_string(); + let asset_id = assets + .get(&inner.asset_name) + .unwrap_or(&default_asset_id) + .to_string(); + + let new_ingestion_config = CreateIngestionConfigResponse { + ingestion_config: Some(IngestionConfig { + ingestion_config_id: Uuid::new_v4().to_string(), + asset_id: asset_id.clone(), + client_key: inner.client_key, + }), + }; + + assets.insert(asset_id.clone(), inner.asset_name); + + Ok(Response::new(new_ingestion_config)) + } + + /// No-op. + async fn create_ingestion_config_flows( + &self, + _request: Request, + ) -> Result, Status> { + Ok(Response::new(CreateIngestionConfigFlowsResponse::default())) + } + + /// No-op. + async fn list_ingestion_config_flows( + &self, + _request: Request, + ) -> Result, Status> { + Ok(Response::new(ListIngestionConfigFlowsResponse::default())) + } +} + +#[tonic::async_trait] +impl IngestService for TestServer { + /// Store ingestion stats. + async fn ingest_with_config_data_stream( + &self, + request: Request>, + ) -> Result, Status> { + self.total_num_streams.fetch_add(1, Relaxed); + + let mut stream = request.into_inner(); + while let Some(msg) = stream.next().await { + self.total_num_messages.fetch_add(1, Relaxed); + let inner = msg?; + self.total_num_bytes_read + .fetch_add(inner.encoded_len() as u64, Relaxed); + + if self.done.load(Relaxed) { + break; + } + } + + Ok(Response::new(IngestWithConfigDataStreamResponse::default())) + } + + /// No-op. + async fn ingest_arbitrary_protobuf_data_stream( + &self, + _: Request>, + ) -> Result, Status> { + unimplemented!() + } +} diff --git a/rust/crates/sift_cli/src/main.rs b/rust/crates/sift_cli/src/main.rs index 2b73f1cb7..fbef1f935 100644 --- a/rust/crates/sift_cli/src/main.rs +++ b/rust/crates/sift_cli/src/main.rs @@ -84,6 +84,9 @@ fn run(clargs: cli::Args) -> Result { cli::ExportCmd::Run(args) => run_future(cmd::export::run(ctx, args)), cli::ExportCmd::Asset(args) => run_future(cmd::export::asset(ctx, args)), }, + Cmd::TestServer(cmd) => match cmd { + cli::TestServerCmd::Run(args) => run_future(cmd::test_server::run(ctx, args)), + }, _ => Ok(ExitCode::SUCCESS), } }