This content originally appeared on Level Up Coding - Medium and was authored by Senthil E
Explore how OpenAI’s experimental framework revolutionizes how we build and orchestrate AI agent systems.
Have you ever wondered how to manage multiple AI agents working together seamlessly, each with their specialized skills and responsibilities? Enter OpenAI’s Swarm, an experimental framework revolutionizing how we approach multi-agent AI systems.
Introduction: Why Swarm Matters
In the rapidly evolving landscape of AI development, one of the most challenging aspects is managing complex workflows that require multiple specialized capabilities. While a single well-crafted prompt can handle many tasks, real-world applications often demand more nuanced approaches. Imagine a customer service system that needs to handle sales, refunds, technical support, and escalations, each requiring different expertise and tools.
This is where Swarm comes in. Released by OpenAI as an educational framework, Swarm introduces an elegant solution to orchestrating multiple AI agents through two fundamental concepts: Routines and Handoffs.
The Problem Swarm Solves
Traditional approaches to building AI systems often face several challenges:
- Complexity Management: As systems grow, cramming all capabilities into a single prompt becomes unwieldy and prone to errors.
- Specialization: Different tasks require different instructions and tools, making it difficult to maintain clarity and efficiency.
- Flow Control: Managing transitions between different types of tasks can become complicated and brittle.
Swarm addresses these challenges by introducing a lightweight, intuitive framework that allows developers to:
- Create specialized agents with focused responsibilities
- Enable seamless transitions between agents through handoffs
- Maintain conversation context across agent transitions
- Implement function calling for real-world actions
A New Paradigm in Agent Orchestration
What makes Swarm particularly interesting is its approach to agent design. Rather than trying to create a single “super agent” that can handle everything, Swarm embraces the concept of specialized agents working together. Each agent is essentially a “routine” — a set of instructions and tools designed for specific tasks.
Think of it like a well-organized company: instead of having one employee trying to do everything, you have specialized departments that can hand off tasks to each other as needed. The sales team handles initial inquiries, the technical team manages product issues, and the support team handles customer service — all working together seamlessly.
Why Developers Should Care
The beauty of Swarm lies in its simplicity and flexibility. While it’s currently an experimental framework, its patterns and principles can be applied to build robust, scalable AI systems. Whether you’re building:
- Customer service platforms
- Multi-step workflow automation
- Complex decision-making systems
- Interactive AI assistants
Swarm’s patterns provide a clear path to effectively organizing and managing your AI agents.
In the following sections, we’ll explore how to implement Swarm in practice, diving into three real-world projects demonstrating its power and flexibility.
Understanding the Core Concepts of Swarm
To grasp the power of Swarm, let’s break down its fundamental building blocks and understand how they work together to create sophisticated AI systems.
The Two Pillars: Routines and Handoffs
Understanding Routines
At its heart, a routine in Swarm is elegantly simple: it’s a combination of instructions (expressed as a system prompt) and the tools needed to execute those instructions. Think of it as a job description combined with the actual tools needed to do the job.
customer_service_agent = Agent(
name="Customer Service Agent",
instructions="You are a customer service agent. Help users with their issues.",
tools=[look_up_order, process_refund, escalate_to_manager]
)
What makes routines powerful is their ability to handle:
- Conditional Logic: Just like in programming, routines can include if-then statements and decision trees
- Natural Language Steps: Instructions are written in plain English, making them easy to understand and modify
- Flexible Execution: The AI can navigate the steps naturally, avoiding rigid state-machine limitations
The Magic of Handoffs
Handoffs are where Swarm truly shines. They enable one agent to transfer control to another agent while maintaining the full context of the conversation. It’s similar to how a customer service representative might transfer you to a specialist — except it happens seamlessly and maintains perfect memory of the conversation.
def transfer_to_sales():
"""Transfer to sales department for purchase-related queries"""
return sales_agent
Key Components in Practice
1. Agents
Agents are the workhorses of Swarm. Each agent is defined by:
- A unique name and identity
- Specific instructions (their “routine”)
- A set of tools they can use
- Optional model specifications
2. Function Calling
Swarm agents can interact with the real world through functions:
def process_refund(order_id: str, reason: str):
"""Process a refund for the given order"""
# Handle refund logic here
return f"Refund processed for order {order_id}"
These functions can:
- Execute real-world actions
- Query databases
- Update systems
- Trigger handoffs to other agents
3. Context Management
Swarm maintains conversation context throughout the entire interaction, even across different agents. This means:
- Previous conversation history is preserved
- Context variables can be passed between agents
- The state can be maintained across handoffs
4. Tool Integration
Tools in Swarm are Python functions that agents can call. They’re automatically converted into a format the AI can understand:
- Function signatures become parameter schemas
- Docstrings become function descriptions
- Return values can trigger agent transitions
How It All Comes Together
Let’s look at a simplified example of how these components work together:
# Define specialized agents
triage_agent = Agent(
name="Triage",
instructions="Direct users to the appropriate department",
tools=[transfer_to_sales, transfer_to_support]
)
sales_agent = Agent(
name="Sales",
instructions="Help users with purchases",
tools=[process_order, check_inventory]
)
# The flow just works naturally
User: "I want to buy a product"
Triage: Transfers to sales (via handoff)
Sales: Handles the purchase
The Power of This Approach
What makes Swarm’s approach particularly effective is:
- Modularity: Each agent can be developed and tested independently
- Scalability: New agents can be added without modifying existing ones
- Maintainability: Instructions and tools can be updated separately
- Flexibility: The system can handle complex flows naturally
- Clarity: Each agent’s role and capabilities are clearly defined
Best Practices for Working with Swarm
When implementing Swarm patterns, consider:
- Keep Instructions Focused: Each agent should have a clear, specific purpose
- Design Clear Handoff Conditions: Make it obvious when transfers should occur
- Maintain Tool Simplicity: Functions should do one thing well
- Use Context Wisely: Pass only necessary information between agents
- Test Edge Cases: Ensure smooth transitions in unexpected situations
Combined with Swarm’s other components, this foundation of routines and handoffs creates a powerful system for building sophisticated AI applications.
Project 1:
Building an Enterprise-Grade Support System with Swarm: A Deep Dive
In this tutorial, we’ll explore how to build a sophisticated support ticketing system using Swarm. This implementation demonstrates enterprise-level features including ticket management, customer validation, and multi-agent support handling.
System Architecture Overview
graph TD
A[Customer Request] --> B[SupportSystem]
B --> C[DatabaseManager]
B --> D[SupportFunctions]
B --> E[Agents]
E --> F[Triage Agent]
E --> G[Technical Support Agent]
E --> H[Billing Support Agent]
C --> I[(SQLite Database)]
D --> C
Core Components
1. Data Models and Enums
First, let’s define our core data structures:
class Priority(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class IssueCategory(Enum):
TECHNICAL = "technical"
BILLING = "billing"
ACCOUNT = "account"
PRODUCT = "product"
COMPLIANCE = "compliance"
@dataclass
class SLAConfig:
priority: Priority
response_time_minutes: int
resolution_time_minutes: int
2. Database Structure
The system uses SQLite with five main tables:
-- Customers table
CREATE TABLE customers (
customer_id TEXT PRIMARY KEY,
region TEXT NOT NULL,
tier TEXT NOT NULL,
language TEXT NOT NULL,
created_at TIMESTAMP
)
-- Products table
CREATE TABLE products (
product_id TEXT PRIMARY KEY,
product_name TEXT NOT NULL,
description TEXT,
created_at TIMESTAMP
)
-- Customer Entitlements table
CREATE TABLE customer_entitlements (
customer_id TEXT,
product_id TEXT,
start_date DATE,
end_date DATE,
FOREIGN KEY references
)
-- Support Tickets table
CREATE TABLE support_tickets (
ticket_id TEXT PRIMARY KEY,
customer_id TEXT,
category TEXT NOT NULL,
description TEXT NOT NULL,
priority TEXT NOT NULL,
status TEXT NOT NULL,
assigned_to TEXT,
created_at TIMESTAMP,
updated_at TIMESTAMP
)
-- Ticket History table
CREATE TABLE ticket_history (
id INTEGER PRIMARY KEY,
ticket_id TEXT,
action TEXT NOT NULL,
details TEXT,
created_at TIMESTAMP
)
3. Support Functions Implementation
The system provides key support functions:
class SupportFunctions:
def validate_customer(self, customer_id: str) -> Dict:
"""Validates customer and retrieves their information"""
# Checks customer exists and gets entitlements
def create_ticket(self, customer_id: str, category: str,
description: str, priority: str) -> Dict:
"""Creates a new support ticket"""
# Generates ticket ID and records in database
def route_ticket(self, ticket_id: str, team: str) -> Dict:
"""Routes ticket to specified team"""
# Updates ticket assignment and status
def escalate_ticket(self, ticket_id: str, reason: str) -> Dict:
"""Escalates a support ticket"""
# Updates ticket status and records escalation
4. Agent Configuration
The system uses three specialized agents:
def create_agents(support_functions: SupportFunctions):
triage_agent = Agent(
name="Triage Agent",
instructions="""You are the initial triage agent for customer support requests.
Your responsibilities:
1. Validate customer information
2. Determine issue category and priority
3. Create support ticket
4. Route to appropriate team""",
functions=[
support_functions.validate_customer,
support_functions.create_ticket,
support_functions.route_ticket
]
)
technical_support_agent = Agent(
name="Technical Support Agent",
instructions="""You are a technical support agent handling technical issues.
For each issue:
1. Review customer entitlements
2. Analyze technical details
3. Provide troubleshooting steps
4. Escalate if beyond scope""",
functions=[support_functions.escalate_ticket]
)
billing_support_agent = Agent(
name="Billing Support Agent",
instructions="""You are a billing support agent handling billing issues.
For each issue:
1. Verify transaction details
2. Check payment status
3. Process adjustments if needed
4. Escalate complex cases""",
functions=[support_functions.escalate_ticket]
)
return {
"triage": triage_agent,
"technical": technical_support_agent,
"billing": billing_support_agent
}
5. Main Support System Class
class SupportSystem:
def __init__(self):
self.db_manager = DatabaseManager()
self.support_functions = SupportFunctions(self.db_manager)
self.agents = create_agents(self.support_functions)
self.client = Swarm()
self.current_agent = self.agents["triage"]
self.context = {}
def handle_request(self, customer_id: str, message: str):
"""Handles incoming support requests"""
messages = [
{
"role": "system",
"content": f"Customer ID: {customer_id}\nInitial Request: {message}"
},
{
"role": "user",
"content": message
}
]
response = self.client.run(
agent=self.current_agent,
messages=messages,
context_variables=self.context
)
self.current_agent = response.agent
self.context = response.context_variables
return response.messages[-1]["content"]
Running the System
1. Setup and Installation
# Create virtual environment
python -m venv venv
source venv/bin/activate # or venv\Scripts\activate on Windows
# Install requirements
pip install openai python-dotenv rich pandas sqlite3
# Create .env file
touch .env
Add to .env:
OPENAI_API_KEY=your_api_key_here
2. Initialize and Run
if __name__ == "__main__":
support_system = SupportSystem()
support_system.initialize_poc() # Loads sample data
# Start handling requests
run_support_system()
3. Testing the System
Here’s a sample test script:
def test_support_system():
system = SupportSystem()
system.initialize_poc()
# Test customer validation
response = system.handle_request("CUST001", "Need help with login")
assert "ticket" in response.lower()
# Test ticket creation
response = system.handle_request(
"CUST001",
"Technical issue with Enterprise Suite"
)
assert "TICKET" in response
# Test ticket routing
system.display_tickets() # Should show routed ticket
Example Usage
$ python support_system.py
Support System Initialized
Available commands:
1. 'tickets' to display current tickets
2. 'quit' to exit
3. Any other input will be treated as a support request
Enter customer ID: CUST001
Enter your support request: I can't access the Enterprise Suite
Agent Response: I'll help you with your access issue. Let me create a ticket...
Key Features
- Robust Database Management
- Customer tracking
- Product management
- Entitlement tracking
- Ticket history
2. Multi-Agent Support
- Triage
- Technical support
- Billing support
- Seamless handoffs
3. Comprehensive Ticket Management
- Priority levels
- Categories
- Status tracking
- History logging
4. Enterprise Features
- SLA tracking
- Escalation handling
- Customer entitlement validation
- Regional support
#full python script
# support_system.py
import datetime
import sqlite3
from enum import Enum
from typing import Dict, List, Optional
from dataclasses import dataclass
import logging
from pathlib import Path
import pandas as pd
from rich.console import Console
from rich.table import Table
from dotenv import load_dotenv
import os
# Import only Swarm and Agent from swarm package
from swarm import Swarm, Agent
# Load environment variables from .env file
load_dotenv()
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
console = Console()
# --- Enums and Data Classes ---
class Priority(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class IssueCategory(Enum):
TECHNICAL = "technical"
BILLING = "billing"
ACCOUNT = "account"
PRODUCT = "product"
COMPLIANCE = "compliance"
@dataclass
class SLAConfig:
priority: Priority
response_time_minutes: int
resolution_time_minutes: int
# --- Database Management ---
class DatabaseManager:
def __init__(self, db_path: str = "support_system.db"):
self.db_path = db_path
self.init_database()
def get_connection(self):
return sqlite3.connect(self.db_path)
def init_database(self):
"""Initialize database with required tables"""
with self.get_connection() as conn:
cursor = conn.cursor()
# Create Customers table
cursor.execute("""
CREATE TABLE IF NOT EXISTS customers (
customer_id TEXT PRIMARY KEY,
region TEXT NOT NULL,
tier TEXT NOT NULL,
language TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Create Products table
cursor.execute("""
CREATE TABLE IF NOT EXISTS products (
product_id TEXT PRIMARY KEY,
product_name TEXT NOT NULL,
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Create Customer Entitlements table
cursor.execute("""
CREATE TABLE IF NOT EXISTS customer_entitlements (
customer_id TEXT,
product_id TEXT,
start_date DATE,
end_date DATE,
FOREIGN KEY (customer_id) REFERENCES customers (customer_id),
FOREIGN KEY (product_id) REFERENCES products (product_id)
)
""")
# Create Support Tickets table
cursor.execute("""
CREATE TABLE IF NOT EXISTS support_tickets (
ticket_id TEXT PRIMARY KEY,
customer_id TEXT,
category TEXT NOT NULL,
description TEXT NOT NULL,
priority TEXT NOT NULL,
status TEXT NOT NULL,
assigned_to TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (customer_id) REFERENCES customers (customer_id)
)
""")
# Create Ticket History table
cursor.execute("""
CREATE TABLE IF NOT EXISTS ticket_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ticket_id TEXT,
action TEXT NOT NULL,
details TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (ticket_id) REFERENCES support_tickets (ticket_id)
)
""")
conn.commit()
def load_sample_data(self):
"""Load sample data for POC"""
with self.get_connection() as conn:
cursor = conn.cursor()
# Sample Customers
customers = [
("CUST001", "AMERICAS", "ENTERPRISE", "en"),
("CUST002", "EMEIA", "SMB", "fr"),
("CUST003", "APAC", "ENTERPRISE", "ja"),
]
cursor.executemany(
"INSERT OR REPLACE INTO customers (customer_id, region, tier, language) VALUES (?, ?, ?, ?)",
customers
)
# Sample Products
products = [
("PROD001", "Enterprise Suite", "Full enterprise solution"),
("PROD002", "Analytics Platform", "Data analytics solution"),
("PROD003", "Security Suite", "Enterprise security package"),
]
cursor.executemany(
"INSERT OR REPLACE INTO products (product_id, product_name, description) VALUES (?, ?, ?)",
products
)
# Sample Entitlements
entitlements = [
("CUST001", "PROD001", "2024-01-01", "2024-12-31"),
("CUST001", "PROD002", "2024-01-01", "2024-12-31"),
("CUST002", "PROD001", "2024-01-01", "2024-12-31"),
("CUST003", "PROD001", "2024-01-01", "2024-12-31"),
("CUST003", "PROD002", "2024-01-01", "2024-12-31"),
("CUST003", "PROD003", "2024-01-01", "2024-12-31"),
]
cursor.executemany(
"""INSERT OR REPLACE INTO customer_entitlements
(customer_id, product_id, start_date, end_date)
VALUES (?, ?, ?, ?)""",
entitlements
)
conn.commit()
# --- Support Functions ---
class SupportFunctions:
def __init__(self, db_manager: DatabaseManager):
self.db = db_manager
def validate_customer(self, customer_id: str) -> Dict:
"""Validate customer and retrieve their information."""
with self.db.get_connection() as conn:
cursor = conn.cursor()
# Make the query case-insensitive
cursor.execute(
"SELECT * FROM customers WHERE UPPER(customer_id) = UPPER(?)",
(customer_id,)
)
customer = cursor.fetchone()
if not customer:
return {"status": "error", "message": f"Customer {customer_id} not found"}
# Get customer entitlements
cursor.execute("""
SELECT p.product_name
FROM customer_entitlements ce
JOIN products p ON ce.product_id = p.product_id
WHERE UPPER(ce.customer_id) = UPPER(?) AND ce.end_date >= date('now')
""", (customer_id,))
entitlements = [row[0] for row in cursor.fetchall()]
return {
"status": "success",
"customer": {
"customer_id": customer[0],
"region": customer[1],
"tier": customer[2],
"language": customer[3],
"entitlements": entitlements
}
}
def create_ticket(self, customer_id: str, category: str, description: str, priority: str) -> Dict:
"""Create a new support ticket."""
logger.info(f"Attempting to create ticket for customer {customer_id}")
logger.info(f"Category: {category}, Priority: {priority}")
with self.db.get_connection() as conn:
cursor = conn.cursor()
# Generate ticket ID
cursor.execute("SELECT COUNT(*) FROM support_tickets")
count = cursor.fetchone()[0]
ticket_id = f"TICKET_{count + 1:04d}"
try:
# Create ticket
cursor.execute("""
INSERT INTO support_tickets
(ticket_id, customer_id, category, description, priority, status)
VALUES (?, ?, ?, ?, ?, ?)
""", (ticket_id, customer_id, category, description, priority, "new"))
# Add to history
cursor.execute("""
INSERT INTO ticket_history (ticket_id, action, details)
VALUES (?, ?, ?)
""", (ticket_id, "CREATED", f"Ticket created with priority {priority}"))
conn.commit()
logger.info(f"Successfully created ticket {ticket_id}")
return {"status": "success", "ticket_id": ticket_id}
except sqlite3.Error as e:
logger.error(f"Database error while creating ticket: {e}")
return {"status": "error", "message": str(e)}
def route_ticket(self, ticket_id: str, team: str) -> Dict:
"""Route ticket to specified team."""
with self.db.get_connection() as conn:
cursor = conn.cursor()
try:
cursor.execute("""
UPDATE support_tickets
SET assigned_to = ?, status = 'assigned', updated_at = CURRENT_TIMESTAMP
WHERE ticket_id = ?
""", (team, ticket_id))
cursor.execute("""
INSERT INTO ticket_history (ticket_id, action, details)
VALUES (?, ?, ?)
""", (ticket_id, "ROUTED", f"Ticket assigned to {team}"))
conn.commit()
return {"status": "success", "message": f"Ticket routed to {team}"}
except sqlite3.Error as e:
logger.error(f"Database error: {e}")
return {"status": "error", "message": str(e)}
def escalate_ticket(self, ticket_id: str, reason: str) -> Dict:
"""Escalate a support ticket."""
with self.db.get_connection() as conn:
cursor = conn.cursor()
try:
cursor.execute("""
UPDATE support_tickets
SET status = 'escalated', updated_at = CURRENT_TIMESTAMP
WHERE ticket_id = ?
""", (ticket_id,))
cursor.execute("""
INSERT INTO ticket_history (ticket_id, action, details)
VALUES (?, ?, ?)
""", (ticket_id, "ESCALATED", reason))
conn.commit()
return {"status": "success", "message": f"Ticket {ticket_id} escalated"}
except sqlite3.Error as e:
logger.error(f"Database error: {e}")
return {"status": "error", "message": str(e)}
# --- Agent Definitions ---
def create_agents(support_functions: SupportFunctions):
triage_agent = Agent(
name="Triage Agent",
instructions="""You are the initial triage agent for customer support requests.
Your responsibilities:
1. Validate customer information
2. Determine issue category and priority
3. Create support ticket
4. Route to appropriate team
Always collect:
- Customer ID
- Issue description
- Category (technical/billing/account/product/compliance)
- Priority (low/medium/high/critical)
Ask for missing information in a single message.""",
functions=[
support_functions.validate_customer,
support_functions.create_ticket,
support_functions.route_ticket
]
)
technical_support_agent = Agent(
name="Technical Support Agent",
instructions="""You are a technical support agent handling technical issues.
For each issue:
1. Review customer entitlements
2. Analyze technical details
3. Provide troubleshooting steps
4. Escalate if beyond scope""",
functions=[support_functions.escalate_ticket]
)
billing_support_agent = Agent(
name="Billing Support Agent",
instructions="""You are a billing support agent handling billing issues.
For each issue:
1. Verify transaction details
2. Check payment status
3. Process adjustments if needed
4. Escalate complex cases""",
functions=[support_functions.escalate_ticket]
)
return {
"triage": triage_agent,
"technical": technical_support_agent,
"billing": billing_support_agent
}
# --- Support System Class ---
class SupportSystem:
def __init__(self):
self.db_manager = DatabaseManager()
self.support_functions = SupportFunctions(self.db_manager)
self.agents = create_agents(self.support_functions)
self.client = Swarm()
self.current_agent = self.agents["triage"]
self.context = {}
def initialize_poc(self):
"""Initialize POC with sample data"""
self.db_manager.load_sample_data()
logger.info("POC data initialized")
def handle_request(self, customer_id: str, message: str):
"""Handle incoming support request"""
messages = [
{
"role": "system",
"content": f"Customer ID: {customer_id}\nInitial Request: {message}"
},
{
"role": "user",
"content": message
}
]
response = self.client.run(
agent=self.current_agent,
messages=messages,
context_variables=self.context
)
self.current_agent = response.agent
self.context = response.context_variables
return response.messages[-1]["content"]
def display_tickets(self):
"""Display current tickets in a formatted table"""
with self.db_manager.get_connection() as conn:
df = pd.read_sql_query("SELECT * FROM support_tickets", conn)
table = Table(title="Current Support Tickets")
for column in df.columns:
table.add_column(column)
for _, row in df.iterrows():
table.add_row(*[str(value) for value in row])
console.print(table)
# --- Main Execution ---
def run_support_system():
support_system = SupportSystem()
support_system.initialize_poc()
console.print("[bold green]Support System Initialized[/bold green]")
console.print("[bold blue]Available commands:[/bold blue]")
console.print("1. 'tickets' to display current tickets")
console.print("2. 'quit' to exit")
console.print("3. Any other input will be treated as a support request")
while True:
command = input("\nEnter command or customer ID: ").strip().lower()
if command == 'quit':
break
elif command == 'tickets':
support_system.display_tickets()
continue
customer_id = command
message = input("Enter your support request: ")
try:
response = support_system.handle_request(customer_id, message)
console.print(f"\n[green]Agent Response:[/green] {response}\n")
except Exception as e:
logger.error(f"Error processing request: {str(e)}")
console.print(f"[red]Error processing request: {str(e)}[/red]")
if __name__ == "__main__":
run_support_system()
Create a Streamlit app for the above:
# app.py
# Import statements
import streamlit as st
# Must be the first Streamlit command!
st.set_page_config(
page_title="Customer Support System",
page_icon="🎫",
layout="wide",
initial_sidebar_state="expanded"
)
import pandas as pd
from datetime import datetime
import plotly.express as px
from dotenv import load_dotenv
from support_system import SupportSystem
# Load environment variables from .env file at startup
load_dotenv()
# Custom CSS for light blue theme and better styling
st.markdown("""
<style>
/* Main container */
.main {
background-color: #f0f8ff;
}
/* Headers */
.css-10trblm {
color: #1E88E5;
font-weight: 600;
}
/* Metrics */
.css-1r6slb0 {
background-color: #E3F2FD;
border-radius: 10px;
padding: 10px;
box-shadow: 2px 2px 5px rgba(0,0,0,0.1);
}
/* Cards */
.card {
background-color: white;
padding: 20px;
border-radius: 10px;
box-shadow: 2px 2px 5px rgba(0,0,0,0.1);
margin: 10px 0;
}
/* Form inputs */
.stTextInput, .stSelectbox, .stTextArea {
background-color: white;
border-radius: 5px;
}
/* Dataframes */
.dataframe {
background-color: white;
border-radius: 10px;
padding: 10px;
}
</style>
""", unsafe_allow_html=True)
# Initialize support system
@st.cache_resource
def init_support_system():
try:
system = SupportSystem()
system.initialize_poc()
return system
except Exception as e:
st.error(f"Error initializing support system: {str(e)}")
st.stop()
# Initialize session state
if 'support_system' not in st.session_state:
st.session_state.support_system = init_support_system()
# Sidebar with improved styling
with st.sidebar:
st.title("🎯 Navigation")
st.markdown("---")
page = st.radio(
"Go to",
["📊 Dashboard", "🎫 Create Ticket", "📋 View Tickets", "👤 Customer Info"]
)
# Main content
st.title("🏢 Customer Support System")
st.markdown("---")
def create_dashboard():
st.header("📊 Support Dashboard")
st.markdown("*Real-time overview of support operations*")
# Get current tickets data
with st.session_state.support_system.db_manager.get_connection() as conn:
tickets_df = pd.read_sql_query("SELECT * FROM support_tickets", conn)
customers_df = pd.read_sql_query("SELECT * FROM customers", conn)
# Create three columns for key metrics with improved styling
col1, col2, col3 = st.columns(3)
with col1:
st.markdown("""
<div class="card">
<h3>📈 Total Tickets</h3>
<h2>{}</h2>
</div>
""".format(len(tickets_df)), unsafe_allow_html=True)
with col2:
st.markdown("""
<div class="card">
<h3>🔓 Open Tickets</h3>
<h2>{}</h2>
</div>
""".format(len(tickets_df[tickets_df['status'] != 'closed'])), unsafe_allow_html=True)
with col3:
st.markdown("""
<div class="card">
<h3>👥 Total Customers</h3>
<h2>{}</h2>
</div>
""".format(len(customers_df)), unsafe_allow_html=True)
st.markdown("---")
# Create visualizations with improved styling
col1, col2 = st.columns(2)
with col1:
fig_priority = px.pie(
tickets_df,
names='priority',
title='🎯 Tickets by Priority',
color_discrete_sequence=px.colors.sequential.Blues
)
fig_priority.update_layout(
plot_bgcolor='rgba(0,0,0,0)',
paper_bgcolor='rgba(0,0,0,0)'
)
st.plotly_chart(fig_priority, use_container_width=True)
with col2:
fig_category = px.bar(
tickets_df.groupby('category').size().reset_index(name='count'),
x='category',
y='count',
title='📊 Tickets by Category',
color_discrete_sequence=px.colors.sequential.Blues
)
fig_category.update_layout(
plot_bgcolor='rgba(0,0,0,0)',
paper_bgcolor='rgba(0,0,0,0)'
)
st.plotly_chart(fig_category, use_container_width=True)
def create_ticket_form():
st.header("🎫 Create Support Ticket")
st.markdown("*Create a new support ticket for customer issues*")
with st.form("ticket_form", clear_on_submit=True):
st.markdown("""
<div class="card">
""", unsafe_allow_html=True)
# Customer ID input with icon
st.markdown("👤 **Customer Information**")
customer_id = st.text_input("Customer ID").upper()
# Category selection with icon
st.markdown("🏷️ **Ticket Details**")
category = st.selectbox(
"Category",
options=[
"technical",
"billing",
"account",
"product",
"compliance"
]
)
# Priority selection with icon
st.markdown("🎯 **Priority Level**")
priority = st.selectbox(
"Priority",
options=[
"low",
"medium",
"high",
"critical"
]
)
# Description input with icon
st.markdown("📝 **Issue Description**")
description = st.text_area("Describe the issue in detail")
st.markdown("</div>", unsafe_allow_html=True)
# Submit button with styling
submitted = st.form_submit_button("🚀 Create Ticket")
if submitted:
if not all([customer_id, category, description]):
st.error("❌ Please fill in all fields")
return
# Validate customer
with st.spinner("🔍 Validating customer..."):
validation = st.session_state.support_system.support_functions.validate_customer(customer_id)
if validation["status"] == "error":
st.error(f"❌ {validation['message']}")
return
# Create ticket
with st.spinner("📝 Creating ticket..."):
result = st.session_state.support_system.support_functions.create_ticket(
customer_id=customer_id,
category=category,
description=description,
priority=priority
)
if result["status"] == "success":
st.success(f"✅ Ticket created successfully! Ticket ID: {result['ticket_id']}")
else:
st.error(f"❌ Error creating ticket: {result['message']}")
def view_tickets():
st.header("📋 View Support Tickets")
st.markdown("*Browse and filter all support tickets*")
# Get tickets data
with st.spinner("📥 Loading tickets..."):
with st.session_state.support_system.db_manager.get_connection() as conn:
tickets_df = pd.read_sql_query("""
SELECT t.*, c.region, c.tier
FROM support_tickets t
LEFT JOIN customers c ON t.customer_id = c.customer_id
""", conn)
# Add filters with icons
st.markdown("🔍 **Filter Tickets**")
col1, col2, col3 = st.columns(3)
with col1:
status_filter = st.multiselect(
"🔵 Status",
options=tickets_df['status'].unique()
)
with col2:
priority_filter = st.multiselect(
"🎯 Priority",
options=tickets_df['priority'].unique()
)
with col3:
category_filter = st.multiselect(
"🏷️ Category",
options=tickets_df['category'].unique()
)
# Apply filters
filtered_df = tickets_df.copy()
if status_filter:
filtered_df = filtered_df[filtered_df['status'].isin(status_filter)]
if priority_filter:
filtered_df = filtered_df[filtered_df['priority'].isin(priority_filter)]
if category_filter:
filtered_df = filtered_df[filtered_df['category'].isin(category_filter)]
# Display tickets with styling
st.markdown("📊 **Tickets Overview**")
st.dataframe(
filtered_df,
column_config={
"created_at": st.column_config.DatetimeColumn("📅 Created At"),
"updated_at": st.column_config.DatetimeColumn("🔄 Updated At")
},
hide_index=True
)
def view_customer_info():
st.header("👤 Customer Information")
st.markdown("*View detailed customer information and history*")
customer_id = st.text_input("🔍 Enter Customer ID").upper()
if customer_id:
with st.spinner("🔍 Looking up customer..."):
validation = st.session_state.support_system.support_functions.validate_customer(customer_id)
if validation["status"] == "success":
customer = validation["customer"]
# Create two columns for customer info
col1, col2 = st.columns(2)
with col1:
st.markdown("""
<div class="card">
<h3>📋 Customer Details</h3>
""", unsafe_allow_html=True)
st.write(f"🆔 **Customer ID:** {customer['customer_id']}")
st.write(f"🌎 **Region:** {customer['region']}")
st.write(f"⭐ **Tier:** {customer['tier']}")
st.write(f"🗣️ **Language:** {customer['language']}")
st.markdown("</div>", unsafe_allow_html=True)
with col2:
st.markdown("""
<div class="card">
<h3>✨ Entitlements</h3>
""", unsafe_allow_html=True)
for entitlement in customer['entitlements']:
st.write(f"📦 {entitlement}")
st.markdown("</div>", unsafe_allow_html=True)
# Show customer's tickets
st.subheader("🎫 Customer Tickets")
with st.session_state.support_system.db_manager.get_connection() as conn:
customer_tickets = pd.read_sql_query(
"SELECT * FROM support_tickets WHERE customer_id = ?",
conn,
params=(customer_id,)
)
if not customer_tickets.empty:
st.dataframe(
customer_tickets,
column_config={
"created_at": st.column_config.DatetimeColumn("📅 Created At"),
"updated_at": st.column_config.DatetimeColumn("🔄 Updated At")
},
hide_index=True
)
else:
st.info("📭 No tickets found for this customer")
else:
st.error(f"❌ {validation['message']}")
# Route to appropriate page based on selection
if page == "📊 Dashboard":
create_dashboard()
elif page == "🎫 Create Ticket":
create_ticket_form()
elif page == "📋 View Tickets":
view_tickets()
else: # Customer Info
view_customer_info()
Need to fix this one . Some functionalities are not working.
Project: 2
Building an AI-Powered Sales Pipeline Management System with Swarm
In this tutorial, we’ll explore how to build a comprehensive sales pipeline management system using Swarm. This system demonstrates enterprise-level sales process automation with multiple specialized agents handling different stages of the sales cycle.
System Architecture
graph TD
A[Sales Request] --> B[SalesPipelineSystem]
B --> C[Database Manager]
B --> D[Sales Functions]
B --> E[Specialized Agents]
E --> F[Lead Qualification Agent]
E --> G[Sales Strategy Agent]
E --> H[Product Specialist Agent]
E --> I[Deal Closure Agent]
E --> J[Documentation Agent]
C --> K[(SQLite Database)]
D --> C
Core Components
1. Data Models and Enumerations
The system uses strongly-typed enumerations for consistent data handling:
class LeadStatus(Enum):
NEW = "new"
QUALIFIED = "qualified"
NEGOTIATING = "negotiating"
CLOSED_WON = "closed_won"
CLOSED_LOST = "closed_lost"
class LeadPriority(Enum):
HOT = "hot"
WARM = "warm"
COLD = "cold"
class Industry(Enum):
TECHNOLOGY = "technology"
HEALTHCARE = "healthcare"
FINANCE = "finance"
RETAIL = "retail"
MANUFACTURING = "manufacturing"
OTHER = "other"
2. Database Schema
The system uses five interconnected tables:
-- Leads Table
CREATE TABLE leads (
lead_id TEXT PRIMARY KEY,
company_name TEXT NOT NULL,
industry TEXT NOT NULL,
contact_name TEXT NOT NULL,
contact_email TEXT NOT NULL,
contact_phone TEXT,
annual_revenue FLOAT,
employee_count INTEGER,
status TEXT NOT NULL,
priority TEXT NOT NULL
);
-- Products Table
CREATE TABLE products (
product_id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
base_price FLOAT NOT NULL,
min_discount FLOAT,
max_discount FLOAT
);
-- Opportunities Table
CREATE TABLE opportunities (
opportunity_id TEXT PRIMARY KEY,
lead_id TEXT NOT NULL,
product_id TEXT NOT NULL,
estimated_value FLOAT NOT NULL,
probability FLOAT NOT NULL,
expected_close_date DATE,
status TEXT NOT NULL
);
-- Activities Table
CREATE TABLE activities (
activity_id TEXT PRIMARY KEY,
lead_id TEXT NOT NULL,
activity_type TEXT NOT NULL,
description TEXT NOT NULL,
next_action_date DATE
);
-- Deal Documents Table
CREATE TABLE deal_documents (
document_id TEXT PRIMARY KEY,
opportunity_id TEXT NOT NULL,
change_type TEXT NOT NULL,
details TEXT NOT NULL,
version INTEGER NOT NULL
);
3. Specialized Sales Functions
The system implements key sales operations:
class SalesFunctions:
def qualify_lead(self, lead_id: str, company_info: str) -> Dict:
"""
Qualifies leads using a scoring system:
- Revenue > 1M: +3 points
- Revenue > 500K: +2 points
- Employees > 100: +2 points
- Employees > 50: +1 point
- Key industries (tech/finance): +2 points
"""
def create_opportunity(self, lead_id: str, product_id: str,
estimated_value: float) -> Dict:
"""Creates and tracks new sales opportunities"""
def generate_proposal(self, opportunity_id: str) -> Dict:
"""Generates customized sales proposals"""
def close_deal(self, opportunity_id: str, status: str,
final_value: float = None) -> Dict:
"""Handles deal closure and updates all related records"""
4. Specialized Sales Agents
The system employs five specialized agents:
def create_agents(sales_functions: SalesFunctions):
# Lead Qualification Agent
lead_qualification_agent = Agent(
name="Lead Qualification Agent",
instructions="""You are a lead qualification agent responsible for:
1. Analyzing new leads
2. Scoring leads based on criteria
3. Determining lead priority
4. Qualifying leads for sales process""",
functions=[sales_functions.qualify_lead]
)
# Sales Strategy Agent
sales_strategy_agent = Agent(
name="Sales Strategy Agent",
instructions="""You are a sales strategy agent responsible for:
1. Analyzing qualified leads
2. Recommending products
3. Creating opportunities
4. Developing sales approaches""",
functions=[sales_functions.create_opportunity]
)
# Product Specialist Agent
product_specialist_agent = Agent(
name="Product Specialist Agent",
instructions="""You are a product specialist agent responsible for:
1. Providing detailed product information
2. Answering technical questions
3. Suggesting product configurations
4. Supporting proposal generation""",
functions=[sales_functions.generate_proposal]
)
# Deal Closure Agent
deal_closure_agent = Agent(
name="Deal Closure Agent",
instructions="""You are a deal closure agent responsible for:
1. Negotiating final terms
2. Handling objections
3. Closing deals
4. Recording outcomes""",
functions=[sales_functions.close_deal]
)
# Documentation Agent
documentation_agent = Agent(
name="Documentation Agent",
instructions="""You are a documentation agent responsible for:
1. Generating sales documentation
2. Creating and maintaining proposals
3. Tracking deal changes and versions
4. Managing compliance requirements""",
functions=[sales_functions.generate_proposal,
sales_functions.document_deal_changes]
)
return {
"qualification": lead_qualification_agent,
"strategy": sales_strategy_agent,
"product": product_specialist_agent,
"closure": deal_closure_agent,
"documentation": documentation_agent
}
1. Setup and Installation
# Create virtual environment
python -m venv venv
source venv/bin/activate # or venv\Scripts\activate on Windows
# Install requirements
pip install openai python-dotenv rich pandas sqlite3
# Create .env file
touch .env
Add to .env:
OPENAI_API_KEY=your_api_key_here
2. Initialize and Run
if __name__ == "__main__":
pipeline_system = SalesPipelineSystem()
pipeline_system.initialize_poc()
run_sales_pipeline()
3. Available Commands
The system supports various commands:
1. 'pipeline' - Display current sales pipeline
2. 'qualify <lead_id>' - Qualify a new lead
3. 'strategy <lead_id>' - Develop sales strategy
4. 'product <lead_id>' - Get product specialist
5. 'close <lead_id>' - Work on deal closure
6. 'docs <lead_id>' - Handle documentation
7. 'quit' - Exit system
Example Usage Scenarios
1. Lead Qualification
> qualify LEAD001
Enter your request: Analyze Tech Corp for qualification
Agent Response: Let me analyze Tech Corp's profile...
- Annual Revenue: $1M (+3 points)
- Employees: 100 (+2 points)
- Industry: Technology (+2 points)
Total Score: 7 points
Lead qualified as HOT priority
2. Creating an Opportunity
> strategy LEAD001
Enter your request: Create opportunity for Enterprise Suite
Agent Response: Creating opportunity...
- Product: Enterprise Suite
- Estimated Value: $50,000
- Initial Probability: 20%
Opportunity created: OPP_20240223123456
3. Generating a Proposal
> product OPP_20240223123456
Enter your request: Generate proposal for Tech Corp
Agent Response: Generating customized proposal...
- Company: Tech Corp
- Product: Enterprise Suite
- Base Price: $50,000
- Valid Until: 2024-03-23
Proposal generated and saved
Best Practices and Extensions
1. Lead Management
- Regularly update lead scores
- Track all lead interactions
- Maintain detailed activity logs
- Set follow-up reminders
2. Opportunity Tracking
- Monitor pipeline velocity
- Track conversion rates
- Update probabilities regularly
- Document all changes
3. Deal Documentation
- Use version control for documents
- Maintain audit trails
- Track all proposal versions
- Document negotiation history
4. System Extensions
- Add email integration
- Implement CRM sync
- Add reporting dashboard
- Include forecasting tools
#full python script
# Sales_Pipeline_Management_System.py
import datetime
import sqlite3
from enum import Enum
from typing import Dict, List, Optional
from dataclasses import dataclass
import logging
from pathlib import Path
import pandas as pd
from rich.console import Console
from rich.table import Table
from dotenv import load_dotenv
import os
from swarm import Swarm, Agent
# Load environment variables from .env file
load_dotenv()
# Set up logging and console
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
console = Console()
# --- Enums and Data Classes ---
class LeadStatus(Enum):
NEW = "new"
QUALIFIED = "qualified"
NEGOTIATING = "negotiating"
CLOSED_WON = "closed_won"
CLOSED_LOST = "closed_lost"
class LeadPriority(Enum):
HOT = "hot"
WARM = "warm"
COLD = "cold"
class Industry(Enum):
TECHNOLOGY = "technology"
HEALTHCARE = "healthcare"
FINANCE = "finance"
RETAIL = "retail"
MANUFACTURING = "manufacturing"
OTHER = "other"
@dataclass
class SalesTarget:
revenue_target: float
leads_target: int
conversion_target: float
quarter: str
year: int
# --- Database Management ---
class DatabaseManager:
def __init__(self, db_path: str = "sales_pipeline.db"):
self.db_path = db_path
self.init_database()
def get_connection(self):
return sqlite3.connect(self.db_path)
def init_database(self):
"""Initialize database with required tables"""
with self.get_connection() as conn:
cursor = conn.cursor()
# Create Leads table
cursor.execute("""
CREATE TABLE IF NOT EXISTS leads (
lead_id TEXT PRIMARY KEY,
company_name TEXT NOT NULL,
industry TEXT NOT NULL,
contact_name TEXT NOT NULL,
contact_email TEXT NOT NULL,
contact_phone TEXT,
annual_revenue FLOAT,
employee_count INTEGER,
status TEXT NOT NULL,
priority TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Create Products table
cursor.execute("""
CREATE TABLE IF NOT EXISTS products (
product_id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
base_price FLOAT NOT NULL,
min_discount FLOAT,
max_discount FLOAT
)
""")
# Create Opportunities table
cursor.execute("""
CREATE TABLE IF NOT EXISTS opportunities (
opportunity_id TEXT PRIMARY KEY,
lead_id TEXT NOT NULL,
product_id TEXT NOT NULL,
estimated_value FLOAT NOT NULL,
probability FLOAT NOT NULL,
expected_close_date DATE,
status TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (lead_id) REFERENCES leads (lead_id),
FOREIGN KEY (product_id) REFERENCES products (product_id)
)
""")
# Create Activities table
cursor.execute("""
CREATE TABLE IF NOT EXISTS activities (
activity_id TEXT PRIMARY KEY,
lead_id TEXT NOT NULL,
activity_type TEXT NOT NULL,
description TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
next_action_date DATE,
FOREIGN KEY (lead_id) REFERENCES leads (lead_id)
)
""")
conn.commit()
def load_sample_data(self):
"""Load sample data for POC"""
with self.get_connection() as conn:
cursor = conn.cursor()
# Sample Products
products = [
("PROD001", "Enterprise Suite", "Complete business solution", 50000.00, 0.10, 0.25),
("PROD002", "Professional Package", "Mid-tier solution", 25000.00, 0.05, 0.15),
("PROD003", "Starter Bundle", "Entry-level package", 10000.00, 0.00, 0.10),
]
cursor.executemany(
"""INSERT OR REPLACE INTO products
(product_id, name, description, base_price, min_discount, max_discount)
VALUES (?, ?, ?, ?, ?, ?)""",
products
)
# Sample Leads
leads = [
("LEAD001", "Tech Corp", "technology", "John Smith", "john@techcorp.com",
"+1234567890", 1000000.00, 100, "new", "hot"),
("LEAD002", "Health Plus", "healthcare", "Sarah Johnson", "sarah@healthplus.com",
"+1234567891", 500000.00, 50, "qualified", "warm"),
("LEAD003", "Finance Pro", "finance", "Mike Wilson", "mike@financepro.com",
"+1234567892", 2000000.00, 200, "negotiating", "hot"),
]
cursor.executemany(
"""INSERT OR REPLACE INTO leads
(lead_id, company_name, industry, contact_name, contact_email,
contact_phone, annual_revenue, employee_count, status, priority)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
leads
)
conn.commit()
# --- Sales Functions ---
class SalesFunctions:
def __init__(self, db_manager: DatabaseManager):
self.db = db_manager
def qualify_lead(self, lead_id: str, company_info: str) -> Dict:
"""Qualify a lead based on company information and scoring criteria."""
with self.db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT annual_revenue, employee_count, industry FROM leads WHERE UPPER(lead_id) = UPPER(?)",
(lead_id,)
)
lead_data = cursor.fetchone()
if not lead_data:
return {
"status": "error",
"message": f"Lead {lead_id} not found. Available leads: {self._get_available_leads()}"
}
revenue, employees, industry = lead_data
# Simple scoring logic
score = 0
if revenue > 1000000:
score += 3
elif revenue > 500000:
score += 2
if employees > 100:
score += 2
elif employees > 50:
score += 1
if industry.lower() in ['technology', 'finance']: score += 2
priority = "hot" if score >= 5 else "warm" if score >= 3 else "cold"
cursor.execute("""
UPDATE leads
SET priority = ?, status = 'qualified', updated_at = CURRENT_TIMESTAMP
WHERE UPPER(lead_id) = UPPER(?)
""", (priority, lead_id))
cursor.execute("""
INSERT INTO activities (activity_id, lead_id, activity_type, description)
VALUES (?, ?, ?, ?)
""", (f"ACT_{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}",
lead_id, "QUALIFICATION", f"Lead qualified with priority: {priority}")
)
conn.commit()
return {
"status": "success",
"priority": priority,
"score": score,
"message": f"Lead qualified with priority {priority}"
}
def _get_available_leads(self) -> str:
"""Helper function to get list of available leads"""
with self.db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT lead_id FROM leads")
leads = cursor.fetchall()
return ", ".join([lead[0] for lead in leads])
def create_opportunity(self, lead_id: str, product_id: str, estimated_value: float) -> Dict:
"""Create a new sales opportunity."""
with self.db.get_connection() as conn:
cursor = conn.cursor()
opportunity_id = f"OPP_{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}"
try:
cursor.execute("""
INSERT INTO opportunities
(opportunity_id, lead_id, product_id, estimated_value, probability, status)
VALUES (?, ?, ?, ?, ?, ?)
""", (opportunity_id, lead_id, product_id, estimated_value, 0.2, "new"))
cursor.execute("""
UPDATE leads
SET status = 'negotiating', updated_at = CURRENT_TIMESTAMP
WHERE UPPER(lead_id) = UPPER(?)
""", (lead_id,))
conn.commit()
return {"status": "success", "opportunity_id": opportunity_id}
except sqlite3.Error as e:
logger.error(f"Database error: {e}")
return {"status": "error", "message": str(e)}
def generate_proposal(self, opportunity_id: str) -> Dict:
"""Generate a sales proposal for an opportunity."""
with self.db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT o.*, l.company_name, l.contact_name, p.name, p.description, p.base_price
FROM opportunities o
JOIN leads l ON o.lead_id = l.lead_id
JOIN products p ON o.product_id = p.product_id
WHERE o.opportunity_id = ?
""", (opportunity_id,))
opp_data = cursor.fetchone()
if not opp_data:
return {"status": "error", "message": "Opportunity not found"}
# Generate proposal details
proposal = {
"opportunity_id": opportunity_id,
"company_name": opp_data[9],
"contact_name": opp_data[10],
"product_name": opp_data[11],
"product_description": opp_data[12],
"base_price": opp_data[13],
"estimated_value": opp_data[3],
"valid_until": (datetime.datetime.now() + datetime.timedelta(days=30)).strftime('%Y-%m-%d')
}
return {"status": "success", "proposal": proposal}
def close_deal(self, opportunity_id: str, status: str, final_value: float = None) -> Dict:
"""Close a deal as won or lost."""
with self.db.get_connection() as conn:
cursor = conn.cursor()
try:
cursor.execute("""
UPDATE opportunities
SET status = ?, estimated_value = COALESCE(?, estimated_value)
WHERE opportunity_id = ?
""", (status, final_value, opportunity_id))
# Update lead status
cursor.execute("""
UPDATE leads
SET status = ?, updated_at = CURRENT_TIMESTAMP
WHERE lead_id = (
SELECT lead_id FROM opportunities WHERE opportunity_id = ?
)
""", (f"closed_{status.lower()}", opportunity_id))
conn.commit()
return {"status": "success", "message": f"Deal {status}"}
except sqlite3.Error as e:
logger.error(f"Database error: {e}")
return {"status": "error", "message": str(e)}
# --- Agent Definitions ---
def create_agents(sales_functions: SalesFunctions):
lead_qualification_agent = Agent(
name="Lead Qualification Agent",
instructions="""You are a lead qualification agent responsible for:
1. Analyzing new leads
2. Scoring leads based on criteria
3. Determining lead priority
4. Qualifying leads for sales process
Always collect:
- Lead ID
- Company information
- Industry details
- Basic financial information""",
functions=[sales_functions.qualify_lead]
)
sales_strategy_agent = Agent(
name="Sales Strategy Agent",
instructions="""You are a sales strategy agent responsible for:
1. Analyzing qualified leads
2. Recommending products
3. Creating opportunities
4. Developing sales approaches
Focus on matching products to customer needs and maximizing deal value.""",
functions=[sales_functions.create_opportunity]
)
product_specialist_agent = Agent(
name="Product Specialist Agent",
instructions="""You are a product specialist agent responsible for:
1. Providing detailed product information
2. Answering technical questions
3. Suggesting product configurations
4. Supporting proposal generation
Ensure accurate product details and competitive positioning.""",
functions=[sales_functions.generate_proposal]
)
deal_closure_agent = Agent(
name="Deal Closure Agent",
instructions="""You are a deal closure agent responsible for:
1. Negotiating final terms
2. Handling objections
3. Closing deals
4. Recording outcomes
Focus on maximizing close rates and maintaining deal values.""",
functions=[sales_functions.close_deal]
)
return {
"qualification": lead_qualification_agent,
"strategy": sales_strategy_agent,
"product": product_specialist_agent,
"closure": deal_closure_agent
}
# --- Sales Pipeline System Class ---
class SalesPipelineSystem:
def __init__(self):
self.db_manager = DatabaseManager()
self.sales_functions = SalesFunctions(self.db_manager)
self.agents = create_agents(self.sales_functions)
self.client = Swarm()
self.current_agent = self.agents["qualification"]
self.context = {}
def initialize_poc(self):
"""Initialize POC with sample data"""
self.db_manager.load_sample_data()
logger.info("POC data initialized")
def handle_request(self, lead_id: str, message: str):
"""Handle incoming sales request"""
messages = [
{
"role": "system",
"content": f"Lead ID: {lead_id}\nRequest: {message}"
},
{
"role": "user",
"content": message
}
]
response = self.client.run(
agent=self.current_agent,
messages=messages,
context_variables=self.context
)
self.current_agent = response.agent
self.context = response.context_variables
return response.messages[-1]["content"]
def display_pipeline(self):
"""Display current sales pipeline in a formatted table"""
with self.db_manager.get_connection() as conn:
df = pd.read_sql_query("""
SELECT
l.lead_id,
l.company_name,
l.status as lead_status,
l.priority,
o.opportunity_id,
o.estimated_value,
o.probability,
o.status as opp_status
FROM leads l
LEFT JOIN opportunities o ON l.lead_id = o.lead_id
ORDER BY l.priority, l.created_at DESC
""", conn)
table = Table(title="Sales Pipeline Overview")
for column in df.columns:
table.add_column(column)
for _, row in df.iterrows():
table.add_row(*[str(value) for value in row])
console.print(table)
# --- Main Execution ---
# --- Main Execution ---
def run_sales_pipeline():
pipeline_system = SalesPipelineSystem()
pipeline_system.initialize_poc()
console.print("[bold green]Sales Pipeline System Initialized[/bold green]")
console.print("[bold blue]Available commands:[/bold blue]")
console.print("1. 'pipeline' to display current sales pipeline")
console.print("2. 'qualify <lead_id>' to qualify a new lead")
console.print("3. 'strategy <lead_id>' to develop sales strategy")
console.print("4. 'product <lead_id>' to get product specialist")
console.print("5. 'close <lead_id>' to work on deal closure")
console.print("6. 'docs <lead_id>' to handle documentation")
console.print("7. 'quit' to exit")
while True:
command = input("\nEnter command: ").strip().lower()
if command == 'quit':
break
elif command == 'pipeline':
pipeline_system.display_pipeline()
continue
# Parse command and lead_id
parts = command.split()
if len(parts) != 2:
console.print("[red]Invalid command format. Use '<command> <lead_id>'[/red]")
continue
command, lead_id = parts
# Switch to appropriate agent based on command
try:
if command == 'qualify':
pipeline_system.current_agent = pipeline_system.agents["qualification"]
elif command == 'strategy':
pipeline_system.current_agent = pipeline_system.agents["strategy"]
elif command == 'product':
pipeline_system.current_agent = pipeline_system.agents["product"]
elif command == 'close':
pipeline_system.current_agent = pipeline_system.agents["closure"]
elif command == 'docs':
pipeline_system.current_agent = pipeline_system.agents["documentation"]
else:
console.print("[red]Invalid command[/red]")
continue
message = input("Enter your request: ")
response = pipeline_system.handle_request(lead_id, message)
console.print(f"\n[green]Agent Response:[/green] {response}\n")
except Exception as e:
logger.error(f"Error processing request: {str(e)}")
console.print(f"[red]Error processing request: {str(e)}[/red]")
# --- Documentation Agent Addition ---
def create_documentation_agent(sales_functions: SalesFunctions):
return Agent(
name="Documentation Agent",
instructions="""You are a documentation agent responsible for:
1. Generating sales documentation
2. Creating and maintaining proposals
3. Tracking deal changes and versions
4. Managing compliance requirements
Ensure all documents are accurate, complete, and compliant with standards.
Track all versions and changes to maintain audit trail.""",
functions=[
sales_functions.generate_proposal,
sales_functions.document_deal_changes
]
)
# Add document_deal_changes function to SalesFunctions class
def document_deal_changes(self, opportunity_id: str, change_type: str, details: str) -> Dict:
"""Document changes to a deal including proposal versions, term changes, etc."""
with self.db.get_connection() as conn:
cursor = conn.cursor()
document_id = f"DOC_{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}"
try:
cursor.execute("""
INSERT INTO deal_documents
(document_id, opportunity_id, change_type, details, version, created_at)
VALUES (?, ?, ?, ?, (
SELECT COALESCE(MAX(version), 0) + 1
FROM deal_documents
WHERE opportunity_id = ?
), CURRENT_TIMESTAMP)
""", (document_id, opportunity_id, change_type, details, opportunity_id))
conn.commit()
return {
"status": "success",
"document_id": document_id,
"message": f"Deal change documented: {change_type}"
}
except sqlite3.Error as e:
logger.error(f"Database error: {e}")
return {"status": "error", "message": str(e)}
# --- Main Execution (Continued) ---
console.print("[bold blue]Available commands:[/bold blue]")
console.print("1. 'pipeline' to display current sales pipeline")
console.print("2. 'qualify <lead_id>' to qualify a new lead")
console.print("3. 'strategy <lead_id>' to develop sales strategy")
console.print("4. 'product <lead_id>' to get product specialist")
console.print("5. 'close <lead_id>' to work on deal closure")
console.print("6. 'docs <lead_id>' to handle documentation")
console.print("7. 'quit' to exit")
while True:
command = input("\nEnter command: ").strip().lower()
if command == 'quit':
break
elif command == 'pipeline':
pipeline_system.display_pipeline()
continue
# Parse command and lead_id
parts = command.split()
if len(parts) != 2:
console.print("[red]Invalid command format. Use '<command> <lead_id>'[/red]")
continue
command, lead_id = parts
# Switch to appropriate agent based on command
try:
if command == 'qualify':
pipeline_system.current_agent = pipeline_system.agents["qualification"]
elif command == 'strategy':
pipeline_system.current_agent = pipeline_system.agents["strategy"]
elif command == 'product':
pipeline_system.current_agent = pipeline_system.agents["product"]
elif command == 'close':
pipeline_system.current_agent = pipeline_system.agents["closure"]
elif command == 'docs':
pipeline_system.current_agent = pipeline_system.agents["documentation"]
else:
console.print("[red]Invalid command[/red]")
continue
message = input("Enter your request: ")
response = pipeline_system.handle_request(lead_id, message)
console.print(f"\n[green]Agent Response:[/green] {response}\n")
except Exception as e:
logger.error(f"Error processing request: {str(e)}")
console.print(f"[red]Error processing request: {str(e)}[/red]")
if __name__ == "__main__":
run_sales_pipeline()
# --- Add new table for Documentation Agent ---
def init_documentation_tables(self):
"""Initialize tables needed for documentation tracking"""
with self.get_connection() as conn:
cursor = conn.cursor()
# Create Deal Documents table
cursor.execute("""
CREATE TABLE IF NOT EXISTS deal_documents (
document_id TEXT PRIMARY KEY,
opportunity_id TEXT NOT NULL,
change_type TEXT NOT NULL,
details TEXT NOT NULL,
version INTEGER NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (opportunity_id) REFERENCES opportunities (opportunity_id)
)
""")
# Create Document Templates table
cursor.execute("""
CREATE TABLE IF NOT EXISTS document_templates (
template_id TEXT PRIMARY KEY,
name TEXT NOT NULL,
content TEXT NOT NULL,
document_type TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
# --- Add sample templates ---
def load_sample_templates(self):
"""Load sample document templates"""
with self.get_connection() as conn:
cursor = conn.cursor()
templates = [
("TMPL001", "Standard Proposal", "Standard proposal template content...", "proposal"),
("TMPL002", "Enterprise Agreement", "Enterprise agreement template content...", "agreement"),
("TMPL003", "Change Order", "Change order template content...", "change_order"),
]
cursor.executemany(
"""INSERT OR REPLACE INTO document_templates
(template_id, name, content, document_type)
VALUES (?, ?, ?, ?)""",
templates
)
conn.commit()
OpenAI Swarm Framework Cheat Sheet
Quick Start
from swarm import Swarm, Agent
# Initialize Swarm
client = Swarm()
# Create basic agent
agent = Agent(
name="Basic Agent",
instructions="You are a helpful agent.",
)
# Run agent
response = client.run(
agent=agent,
messages=[{"role": "user", "content": "Hello!"}]
)
Agent Creation
Basic Agent
agent = Agent(
name="Agent Name",
model="gpt-4o", # Optional, defaults to gpt-4o
instructions="Detailed instructions here",
functions=[function1, function2] # Optional
)
Agent with Dynamic Instructions
def dynamic_instructions(context_variables):
user_name = context_variables["user_name"]
return f"Help {user_name} with their request."
agent = Agent(
name="Dynamic Agent",
instructions=dynamic_instructions
)
Function Integration
Basic Function
def greet(name: str, language: str = "English") -> str:
"""Greet user in specified language.
Args:
name: User's name
language: Language for greeting
"""
return f"Hello, {name}!"
Function with Context
def process_data(context_variables: dict, data_type: str):
user_id = context_variables["user_id"]
return f"Processing {data_type} for user {user_id}"
Agent Handoff Function
def transfer_to_sales():
"""Transfer to sales department."""
return sales_agent # Returns another Agent instance
Context Management
Setting Context Variables
response = client.run(
agent=agent,
messages=messages,
context_variables={
"user_id": "123",
"preferences": {"language": "en"}
}
)
Running Swarm
Basic Run
response = client.run(
agent=agent,
messages=[{"role": "user", "content": "Hello"}]
)python
Run with Options
response = client.run(
agent=agent,
messages=messages,
context_variables=context,
max_turns=5,
model_override="gpt-4",
execute_tools=True,
stream=False,
debug=False
)
Streaming Response
stream = client.run(agent=agent, messages=messages, stream=True)
for chunk in stream:
print(chunk)
Message Handling
Message Structure
messages = [
{"role": "system", "content": "System message"},
{"role": "user", "content": "User message"},
{"role": "assistant", "content": "Assistant response"}
]
Response Processing
response = client.run(agent=agent, messages=messages)
last_message = response.messages[-1]["content"]
current_agent = response.agent
updated_context = response.context_variables
Common Patterns
Multi-Agent Setup
def create_agents():
triage_agent = Agent(
name="Triage",
instructions="Route requests appropriately",
functions=[transfer_to_specialist]
)
specialist_agent = Agent(
name="Specialist",
instructions="Handle specialized tasks",
functions=[process_task]
)
return {"triage": triage_agent, "specialist": specialist_agent}
Interactive Loop
def run_interactive():
messages = []
current_agent = initial_agent
while True:
user_input = input("> ")
messages.append({"role": "user", "content": user_input})
response = client.run(
agent=current_agent,
messages=messages
)
messages = response.messages
current_agent = response.agent
Best Practices
Agent Design
- Keep instructions focused and specific
- Use clear handoff conditions
- Implement proper error handling
- Include detailed function documentation
Function Implementation
def process_request(
context_variables: dict,
request_id: str,
priority: str = "medium"
) -> dict:
"""Process a user request with given priority.
Args:
context_variables: Current context
request_id: Unique request identifier
priority: Request priority (low/medium/high)
Returns:
dict: Processing result
"""
try:
# Processing logic
return {"status": "success", "result": result}
except Exception as e:
return {"status": "error", "message": str(e)}
Error Handling
def safe_execution(func):
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
return {"status": "error", "message": str(e)}
return wrapper
Testing
Basic Test Setup
def test_agent_response():
client = Swarm()
response = client.run(
agent=test_agent,
messages=[{"role": "user", "content": "test message"}],
execute_tools=False
)
assert response.messages[-1]["content"] is not None
Function Call Testing
def test_function_calls():
tool_calls = run_and_get_tool_calls(
agent=agent,
query="Process this request"
)
assert len(tool_calls) == 1
assert tool_calls[0]["function"]["name"] == "process_request"
Debugging
Enable Debug Mode
response = client.run(
agent=agent,
messages=messages,
debug=True
)
Logging Setup
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
Remember:
- Test thoroughly before production
- Monitor agent performance
- Keep functions simple and focused
- Maintain clear documentation
- Use type hints and docstrings
- Implement proper error handling
- Follow security best practices
Final Thoughts
While Swarm is currently an experimental framework, its patterns and principles provide valuable insights into the future of AI system design. The combination of simplicity, flexibility, and power makes it an excellent starting point for developers looking to build sophisticated multi-agent systems.
Whether you’re building customer service solutions, sales automation systems, or other complex AI applications, our explored patterns provide a solid foundation for future development. As AI continues to evolve, the principles of clear agent specialization, smooth handoffs, and maintainable architecture will remain crucial to successful implementations.
Remember that the true power of Swarm lies not just in its technical capabilities, but in how it helps us think about and structure AI systems. By embracing these patterns and continuing to innovate upon them, we can build increasingly sophisticated AI solutions that are both powerful and maintainable.
As you begin your journey with Swarm, stay curious, experiment freely, and most importantly, focus on solving real business problems. The framework provides the tools — how you use them to create value is limited only by your imagination.
Resources for Further Learning
- OpenAI’s Official Swarm Documentation
- Swarm Examples Repository
- Multi-Agent Systems Research Papers
- AI Orchestration Best Practices
Glossary of Terms: OpenAI Swarm and AI Agents
A
Agent : A specialized AI component with specific instructions and tools, designed to perform particular tasks or handle specific aspects of a conversation.
Agent Handoff : The process of transferring control of a conversation from one agent to another while maintaining context and conversation history.
API (Application Programming Interface) : The interface that allows communication between Swarm and OpenAI’s language models, as well as other services.
C
Context Variables : Data that persists across different agent interactions and can be accessed by functions and agents throughout a conversation.
Conversation History : The complete record of messages exchanged between users and agents during an interaction.
Completion : A response generated by the language model based on the provided prompt and context.
F
Function Calling : The ability of an agent to execute predefined Python functions to perform specific tasks or retrieve information.
Function Schema : A JSON representation of a Python function that describes its parameters, types, and documentation to the AI model.
H
Handoff Function : A special type of function that returns another agent, enabling the transfer of conversation control between agents.
I
Instructions : The specific guidance provided to an agent that defines its role, responsibilities, and behavior (implemented as system prompt).
Intent : The underlying purpose or goal of a user’s message that agents need to understand and act upon.
M
Messages : Individual units of communication between users and agents, including system messages, user inputs, and agent responses.
Model Override : The ability to specify a different language model for specific agent interactions.
O
Orchestration : The coordination and management of multiple agents working together to handle complex tasks.
P
Pipeline : A sequence of agent interactions and operations designed to handle a specific business process or workflow.
Prompt : The combination of instructions, context, and messages that are sent to the language model to generate responses.
R
Response : The object returned by Swarm containing messages, current agent, and context variables after processing a request.
Routine : A set of steps and instructions that define how an agent should handle specific situations or tasks.
S
Stream : A mode of operation where the agent’s response is returned incrementally rather than all at once.
Swarm : The main client class that handles the execution of agents and manages their interactions.
System Message : The initial message containing an agent’s instructions that defines its behavior and capabilities.
T
Tool : A function that an agent can call to perform specific actions or retrieve information.
Tool Choice : The specification of which tools an agent should consider using in its responses.
Tool Schema : The JSON structure that defines the interface of a tool, including its parameters and expected outputs.
Advanced Concepts
Context Management : The system for maintaining and updating state information across multiple agent interactions.
Embedding : Vector representations of text used for semantic search and similarity comparisons in knowledge bases.
Function Broadcasting : The ability to make specific functions available to multiple agents within a system.
Message Flow Control : The management and routing of messages between different agents and components in a Swarm system.
Pipeline Orchestration : The coordination of multiple agents and processes in a specific sequence to achieve complex business objectives.
State Machine : A model representing the different states an interaction can be in and the valid transitions between them.
Vector Database : A specialized database used to store and query vector embeddings for efficient information retrieval.
Business Concepts
Agent Specialization : The practice of creating agents with focused expertise for specific tasks or domains.
Conversation Flow : The structured progression of interactions between users, agents, and systems.
Knowledge Base : A repository of information that agents can query to provide accurate responses.
SLA (Service Level Agreement) : Defined performance metrics and response time expectations for agent interactions.
OpenAI’s Swarm: The Future of Multi-Agent AI Systems Explained was originally published in Level Up Coding on Medium, where people are continuing the conversation by highlighting and responding to this story.
This content originally appeared on Level Up Coding - Medium and was authored by Senthil E
Senthil E | Sciencx (2024-10-24T01:06:08+00:00) OpenAI’s Swarm: The Future of Multi-Agent AI Systems Explained. Retrieved from https://www.scien.cx/2024/10/24/openais-swarm-the-future-of-multi-agent-ai-systems-explained/
Please log in to upload a file.
There are no updates yet.
Click the Upload button above to add an update.