Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,9 @@ agent/app_dumps/*.*
agent/bin/*.*
agent/laravel_agent/template_backup_*/**
agent/benchmark_results*/*

# Planning and development files
.plan
plan.md
**/plan.md
**/.plan
4 changes: 4 additions & 0 deletions dabgent/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

121 changes: 24 additions & 97 deletions dabgent/dabgent_agent/examples/planning.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
use dabgent_agent::agent::{self};
use dabgent_agent::handler::Handler;
use dabgent_agent::thread::{self};
use dabgent_agent::toolbox::{self, basic::toolset};
use dabgent_mq::EventStore;
use dabgent_mq::db::{Query, sqlite::SqliteStore};
use dabgent_agent::orchestrator::PlanningOrchestrator;
use dabgent_agent::validator::PythonUvValidator;
use dabgent_mq::db::sqlite::SqliteStore;
use dabgent_sandbox::dagger::Sandbox as DaggerSandbox;
use dabgent_sandbox::{Sandbox, SandboxDyn};
use dabgent_sandbox::Sandbox;
use eyre::Result;

#[tokio::main]
Expand All @@ -22,109 +19,39 @@ async fn run() {
let llm = rig::providers::anthropic::Client::new(api_key.as_str());
let sandbox = sandbox(&client).await?;
let store = store().await;

let tools = toolset(Validator);
let planning_worker = agent::Worker::new(llm, store.clone(), SYSTEM_PROMPT.to_owned(), tools);

let tools = toolset(Validator);
let mut sandbox_worker = agent::ToolWorker::new(sandbox.boxed(), store.clone(), tools);

tokio::spawn(async move {
let _ = planning_worker.run("planning", "thread").await;
});
tokio::spawn(async move {
let _ = sandbox_worker.run("planning", "thread").await;
});

let event = thread::Event::Prompted(
"Implement a service that takes CSV file as input and produces Hypermedia API as output. Make sure to run it in such a way it does not block the agent while running (it will be run by uv run main.py command)".to_owned(),

let orchestrator = PlanningOrchestrator::new(
store.clone(),
"example".to_string(),
"demo".to_string()
);
store
.push_event("planning", "thread", &event, &Default::default())
.await?;

let query = Query {
stream_id: "planning".to_owned(),
event_type: None,
aggregate_id: Some("thread".to_owned()),
};

let mut receiver = store.subscribe::<thread::Event>(&query)?;
let mut events = store.load_events(&query, None).await?;
let idle_timeout = std::time::Duration::from_secs(60);
loop {
match tokio::time::timeout(idle_timeout, receiver.next()).await {
Ok(Some(Ok(event))) => {
events.push(event.clone());
let thread = thread::Thread::fold(&events);
tracing::info!(?thread.state, ?event, "event");
if let thread::State::Done = thread.state {
break;
}
}
Ok(Some(Err(e))) => {
tracing::error!(error = ?e, "event stream error");
continue;
}
Ok(None) => {
tracing::warn!("event stream closed");
break;
}
Err(_) => {
tracing::warn!("no events for 60s, exiting");
break;
}
}
}


orchestrator.setup_workers(sandbox.boxed(), llm, PythonUvValidator).await?;

let task = "Implement a service that takes CSV file as input and produces Hypermedia API as output. Make sure to run it in such a way it does not block the agent while running (it will be run by uv run main.py command)";
orchestrator.process_message(task.to_string()).await?;

orchestrator.monitor_progress(|status| Box::pin(async move {
tracing::info!("Status: {}", status);
Ok(())
})).await?;
Ok(())
})
.await
.unwrap();
}).await.unwrap();
}

async fn sandbox(client: &dagger_sdk::DaggerConn) -> Result<DaggerSandbox> {
let opts = dagger_sdk::ContainerBuildOptsBuilder::default()
.dockerfile("Dockerfile")
.build()?;
let ctr = client
.container()
.build_opts(client.host().directory("./examples"), opts);
let ctr = client.container().build_opts(client.host().directory("./examples"), opts);
ctr.sync().await?;
let sandbox = DaggerSandbox::from_container(ctr);
Ok(sandbox)
Ok(DaggerSandbox::from_container(ctr))
}

async fn store() -> SqliteStore {
let pool = sqlx::SqlitePool::connect(":memory:")
.await
let pool = sqlx::SqlitePool::connect(":memory:").await
.expect("Failed to create in-memory SQLite pool");
let store = SqliteStore::new(pool);
store.migrate().await;
store
}

const SYSTEM_PROMPT: &str = "
You are a python software engineer.
Workspace is already set up using uv init.
Use uv package manager if you need to add extra libraries.
Program will be run using uv run main.py command.
You are also a planning expert who breaks down complex tasks to planning.md file and updates them there after each step.
";

pub struct Validator;

impl toolbox::Validator for Validator {
async fn run(&self, sandbox: &mut Box<dyn SandboxDyn>) -> Result<Result<(), String>> {
// Delegate timeout to Dagger via DAGGER_EXEC_TIMEOUT_SECS
// Here we just run the command and interpret exit codes
let result = sandbox.exec("uv run main.py").await?;
Ok(match result.exit_code {
0 | 124 => Ok(()),
code => Err(format!(
"code: {}\nstdout: {}\nstderr: {}",
code, result.stdout, result.stderr
)),
})
}
}
}
97 changes: 97 additions & 0 deletions dabgent/dabgent_agent/examples/test_event_flow.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
use dabgent_agent::orchestrator::PlanningOrchestrator;
use dabgent_agent::validator::PythonUvValidator;
use dabgent_mq::db::sqlite::SqliteStore;
use dabgent_mq::db::{EventStore, Query};
use dabgent_agent::thread;
use dabgent_sandbox::dagger::Sandbox as DaggerSandbox;
use dabgent_sandbox::Sandbox;
use eyre::Result;

#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.init();

println!("Testing event flow...\n");

// Setup store
let pool = sqlx::SqlitePool::connect(":memory:").await?;
let store = SqliteStore::new(pool);
store.migrate().await;

// Create orchestrator
let stream_id = "test_stream".to_string();
let aggregate_id = "test_aggregate".to_string();

println!("Creating orchestrator with:");
println!(" stream_id: {}", stream_id);
println!(" aggregate_id: {}", aggregate_id);

let orchestrator = PlanningOrchestrator::new(
store.clone(),
stream_id.clone(),
aggregate_id.clone()
);

// The orchestrator will use stream_id + "_planning"
let actual_stream = format!("{}_planning", stream_id);
println!(" actual stream (with suffix): {}", actual_stream);

// Check if we can push and retrieve events
println!("\n1. Testing direct event push...");
orchestrator.process_message("Test task".to_string()).await?;

// Check if event was stored
let events = store.load_events::<thread::Event>(&Query {
stream_id: actual_stream.clone(),
event_type: None,
aggregate_id: Some(aggregate_id.clone()),
}, None).await?;

println!(" Events in store: {}", events.len());
for (i, event) in events.iter().enumerate() {
println!(" Event {}: {:?}", i, match event {
thread::Event::Prompted(msg) => format!("Prompted: {}", &msg[..50.min(msg.len())]),
thread::Event::LlmCompleted(_) => "LlmCompleted".to_string(),
thread::Event::ToolCompleted(_) => "ToolCompleted".to_string(),
thread::Event::UserResponded(_) => "UserResponded".to_string(),
});
}

// Now test with workers (without actually running Dagger)
println!("\n2. Testing with mock setup...");

// Create a simple mock LLM (this will fail but we just want to see if workers start)
let api_key = "test_key";
let llm = rig::providers::anthropic::Client::new(api_key);

println!(" Note: Workers will fail without real sandbox/LLM, but we can see if they start");

// Try to subscribe to events
println!("\n3. Testing event subscription...");
let mut receiver = store.subscribe::<thread::Event>(&Query {
stream_id: actual_stream.clone(),
event_type: None,
aggregate_id: Some(aggregate_id.clone()),
})?;

// Push another event
orchestrator.process_message("Another test".to_string()).await?;

// Try to receive it
match tokio::time::timeout(std::time::Duration::from_secs(1), receiver.next()).await {
Ok(Some(Ok(event))) => {
println!(" Received event via subscription: {:?}", match &event {
thread::Event::Prompted(msg) => format!("Prompted: {}", &msg[..50.min(msg.len())]),
_ => "Other".to_string(),
});
}
Ok(Some(Err(e))) => println!(" Subscription error: {}", e),
Ok(None) => println!(" Subscription closed"),
Err(_) => println!(" Timeout waiting for event"),
}

println!("\n✅ Event flow test completed");
Ok(())
}
72 changes: 72 additions & 0 deletions dabgent/dabgent_agent/examples/test_planner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use dabgent_agent::planner::{Planner, PlanUpdate};
use dabgent_mq::db::sqlite::SqliteStore;
use eyre::Result;

#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.init();

println!("Testing Planner functionality...\n");

// Setup store
let pool = sqlx::SqlitePool::connect(":memory:").await?;
let store = SqliteStore::new(pool);
store.migrate().await;

// Create planner
let planner = Planner::new(
store.clone(),
"test_planner".to_string(),
"test_aggregate".to_string(),
);

// Test 1: Start planning
println!("1. Starting planning for a task...");
planner.start_planning("Build a REST API with authentication".to_string()).await?;

// Check if plan.md was created
if let Ok(content) = tokio::fs::read_to_string("plan.md").await {
println!(" ✅ plan.md created:");
println!(" {}", content.lines().take(5).collect::<Vec<_>>().join("\n "));
}

// Test 2: Add steps
println!("\n2. Adding steps to the plan...");
planner.update_plan(PlanUpdate::AddStep("Design API endpoints".to_string())).await?;
planner.update_plan(PlanUpdate::AddStep("Implement user model".to_string())).await?;
planner.update_plan(PlanUpdate::AddStep("Add JWT authentication".to_string())).await?;

// Test 3: Request clarification
println!("\n3. Requesting clarification...");
planner.update_plan(PlanUpdate::RequestClarification(
"Which database should be used - PostgreSQL or MongoDB?".to_string()
)).await?;

// Test 4: Complete a step
println!("\n4. Marking step as complete...");
planner.update_plan(PlanUpdate::CompleteStep(0)).await?;

// Test 5: Add notes
println!("\n5. Adding notes...");
planner.update_plan(PlanUpdate::AddNote(
"Using JWT for stateless authentication".to_string()
)).await?;

// Test 6: Complete planning
println!("\n6. Completing planning...");
planner.complete_planning().await?;

// Show final plan
println!("\n=== Final Plan ===");
if let Ok(content) = tokio::fs::read_to_string("plan.md").await {
println!("{}", content);
}

// Clean up
tokio::fs::remove_file("plan.md").await.ok();

println!("\n✅ All planner tests passed!");
Ok(())
}
Loading
Loading