Rewrite all reference files, asset templates, and SKILL.md to use current API patterns (.call(), result.field, T::Enum classes, Tools::Base). Add two new reference files (toolsets, observability) covering tools DSL, event system, and Langfuse integration. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
13 KiB
DSPy.rb Observability
DSPy.rb provides an event-driven observability system built on OpenTelemetry. The system replaces monkey-patching with structured event emission, pluggable listeners, automatic span creation, and non-blocking Langfuse export.
Event System
Emitting Events
Emit structured events with DSPy.event:
DSPy.event('lm.tokens', {
'gen_ai.system' => 'openai',
'gen_ai.request.model' => 'gpt-4',
input_tokens: 150,
output_tokens: 50,
total_tokens: 200
})
Event names are strings with dot-separated namespaces (e.g., 'llm.generate', 'react.iteration_complete', 'chain_of_thought.reasoning_complete'). Do not use symbols for event names.
Attributes must be JSON-serializable. DSPy automatically merges context (trace ID, module stack) and creates OpenTelemetry spans.
Global Subscriptions
Subscribe to events across the entire application with DSPy.events.subscribe:
# Exact event name
subscription_id = DSPy.events.subscribe('lm.tokens') do |event_name, attrs|
puts "Tokens used: #{attrs[:total_tokens]}"
end
# Wildcard pattern -- matches llm.generate, llm.stream, etc.
DSPy.events.subscribe('llm.*') do |event_name, attrs|
track_llm_usage(attrs)
end
# Catch-all wildcard
DSPy.events.subscribe('*') do |event_name, attrs|
log_everything(event_name, attrs)
end
Use global subscriptions for cross-cutting concerns: observability exporters (Langfuse, Datadog), centralized logging, metrics collection.
Module-Scoped Subscriptions
Declare listeners inside a DSPy::Module subclass. Subscriptions automatically scope to the module instance and its descendants:
class ResearchReport < DSPy::Module
subscribe 'lm.tokens', :track_tokens, scope: :descendants
def initialize
super
@outliner = DSPy::Predict.new(OutlineSignature)
@writer = DSPy::Predict.new(SectionWriterSignature)
@token_count = 0
end
def forward(question:)
outline = @outliner.call(question: question)
outline.sections.map do |title|
draft = @writer.call(question: question, section_title: title)
{ title: title, body: draft.paragraph }
end
end
def track_tokens(_event, attrs)
@token_count += attrs.fetch(:total_tokens, 0)
end
end
The scope: parameter accepts:
:descendants(default) -- receives events from the module and every nested module invoked inside it.DSPy::Module::SubcriptionScope::SelfOnly-- restricts delivery to events emitted by the module instance itself; ignores descendants.
Inspect active subscriptions with registered_module_subscriptions. Tear down with unsubscribe_module_events.
Unsubscribe and Cleanup
Remove a global listener by subscription ID:
id = DSPy.events.subscribe('llm.*') { |name, attrs| }
DSPy.events.unsubscribe(id)
Build tracker classes that manage their own subscription lifecycle:
class TokenBudgetTracker
def initialize(budget:)
@budget = budget
@usage = 0
@subscriptions = []
@subscriptions << DSPy.events.subscribe('lm.tokens') do |_event, attrs|
@usage += attrs.fetch(:total_tokens, 0)
warn("Budget hit") if @usage >= @budget
end
end
def unsubscribe
@subscriptions.each { |id| DSPy.events.unsubscribe(id) }
@subscriptions.clear
end
end
Clearing Listeners in Tests
Call DSPy.events.clear_listeners in before/after blocks to prevent cross-contamination between test cases:
RSpec.configure do |config|
config.after(:each) { DSPy.events.clear_listeners }
end
dspy-o11y Gems
Three gems compose the observability stack:
| Gem | Purpose |
|---|---|
dspy |
Core event bus (DSPy.event, DSPy.events) -- always available |
dspy-o11y |
OpenTelemetry spans, AsyncSpanProcessor, DSPy::Context.with_span helpers |
dspy-o11y-langfuse |
Langfuse adapter -- configures OTLP exporter targeting Langfuse endpoints |
Installation
# Gemfile
gem 'dspy'
gem 'dspy-o11y' # core spans + helpers
gem 'dspy-o11y-langfuse' # Langfuse/OpenTelemetry adapter (optional)
If the optional gems are absent, DSPy falls back to logging-only mode with no errors.
Langfuse Integration
Environment Variables
# Required
export LANGFUSE_PUBLIC_KEY=pk-lf-your-public-key
export LANGFUSE_SECRET_KEY=sk-lf-your-secret-key
# Optional (defaults to https://cloud.langfuse.com)
export LANGFUSE_HOST=https://us.cloud.langfuse.com
# Tuning (optional)
export DSPY_TELEMETRY_BATCH_SIZE=100 # spans per export batch (default 100)
export DSPY_TELEMETRY_QUEUE_SIZE=1000 # max queued spans (default 1000)
export DSPY_TELEMETRY_EXPORT_INTERVAL=60 # seconds between timed exports (default 60)
export DSPY_TELEMETRY_SHUTDOWN_TIMEOUT=10 # seconds to drain on shutdown (default 10)
Automatic Configuration
Call DSPy::Observability.configure! once at boot (it is already called automatically when require 'dspy' runs and Langfuse env vars are present):
require 'dspy'
# If LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY are set,
# DSPy::Observability.configure! runs automatically and:
# 1. Configures the OpenTelemetry SDK with an OTLP exporter
# 2. Creates dual output: structured logs AND OpenTelemetry spans
# 3. Exports spans to Langfuse using proper authentication
# 4. Falls back gracefully if gems are missing
Verify status with DSPy::Observability.enabled?.
Automatic Tracing
With observability enabled, every DSPy::Module#forward call, LM request, and tool invocation creates properly nested spans. Langfuse receives hierarchical traces:
Trace: abc-123-def
+-- ChainOfThought.forward [2000ms] (observation type: chain)
+-- llm.generate [1000ms] (observation type: generation)
Model: gpt-4-0613
Tokens: 100 in / 50 out / 150 total
DSPy maps module classes to Langfuse observation types automatically via DSPy::ObservationType.for_module_class:
| Module | Observation Type |
|---|---|
DSPy::LM (raw chat) |
generation |
DSPy::ChainOfThought |
chain |
DSPy::ReAct |
agent |
| Tool invocations | tool |
| Memory/retrieval | retriever |
| Embedding engines | embedding |
| Evaluation modules | evaluator |
| Generic operations | span |
Score Reporting
DSPy.score API
Report evaluation scores with DSPy.score:
# Numeric (default)
DSPy.score('accuracy', 0.95)
# With comment
DSPy.score('relevance', 0.87, comment: 'High semantic similarity')
# Boolean
DSPy.score('is_valid', 1, data_type: DSPy::Scores::DataType::Boolean)
# Categorical
DSPy.score('sentiment', 'positive', data_type: DSPy::Scores::DataType::Categorical)
# Explicit trace binding
DSPy.score('accuracy', 0.95, trace_id: 'custom-trace-id')
Available data types: DSPy::Scores::DataType::Numeric, ::Boolean, ::Categorical.
score.create Events
Every DSPy.score call emits a 'score.create' event. Subscribe to react:
DSPy.events.subscribe('score.create') do |event_name, attrs|
puts "#{attrs[:score_name]} = #{attrs[:score_value]}"
# Also available: attrs[:score_id], attrs[:score_data_type],
# attrs[:score_comment], attrs[:trace_id], attrs[:observation_id],
# attrs[:timestamp]
end
Async Langfuse Export with DSPy::Scores::Exporter
Configure the exporter to send scores to Langfuse in the background:
exporter = DSPy::Scores::Exporter.configure(
public_key: ENV['LANGFUSE_PUBLIC_KEY'],
secret_key: ENV['LANGFUSE_SECRET_KEY'],
host: 'https://cloud.langfuse.com'
)
# Scores are now exported automatically via a background Thread::Queue
DSPy.score('accuracy', 0.95)
# Shut down gracefully (waits up to 5 seconds by default)
exporter.shutdown
The exporter subscribes to 'score.create' events internally, queues them for async processing, and retries with exponential backoff on failure.
Automatic Export with DSPy::Evals
Pass export_scores: true to DSPy::Evals to export per-example scores and an aggregate batch score automatically:
evaluator = DSPy::Evals.new(
program,
metric: my_metric,
export_scores: true,
score_name: 'qa_accuracy'
)
result = evaluator.evaluate(test_examples)
DSPy::Context.with_span
Create manual spans for custom operations. Requires dspy-o11y.
DSPy::Context.with_span(operation: 'custom.retrieval', 'retrieval.source' => 'pinecone') do |span|
results = pinecone_client.query(embedding)
span&.set_attribute('retrieval.count', results.size) if span
results
end
Pass semantic attributes as keyword arguments alongside operation:. The block receives an OpenTelemetry span object (or nil when observability is disabled). The span automatically nests under the current parent span and records duration.ms, langfuse.observation.startTime, and langfuse.observation.endTime.
Assign a Langfuse observation type to custom spans:
DSPy::Context.with_span(
operation: 'evaluate.batch',
**DSPy::ObservationType::Evaluator.langfuse_attributes,
'batch.size' => examples.length
) do |span|
run_evaluation(examples)
end
Scores reported inside a with_span block automatically inherit the current trace context.
Module Stack Metadata
When DSPy::Module#forward runs, the context layer maintains a module stack. Every event includes:
{
module_path: [
{ id: "root_uuid", class: "DeepSearch", label: nil },
{ id: "planner_uuid", class: "DSPy::Predict", label: "planner" }
],
module_root: { id: "root_uuid", class: "DeepSearch", label: nil },
module_leaf: { id: "planner_uuid", class: "DSPy::Predict", label: "planner" },
module_scope: {
ancestry_token: "root_uuid>planner_uuid",
depth: 2
}
}
| Key | Meaning |
|---|---|
module_path |
Ordered array of {id, class, label} entries from root to leaf |
module_root |
The outermost module in the current call chain |
module_leaf |
The innermost (currently executing) module |
module_scope.ancestry_token |
Stable string of joined UUIDs representing the nesting path |
module_scope.depth |
Integer depth of the current module in the stack |
Labels are set via module_scope_label= on a module instance or derived automatically from named predictors. Use this metadata to power Langfuse filters, scoped metrics, or custom event routing.
Dedicated Export Worker
The DSPy::Observability::AsyncSpanProcessor (from dspy-o11y) keeps telemetry export off the hot path:
- Runs on a
Concurrent::SingleThreadExecutor-- LLM workflows never compete with OTLP networking. - Buffers finished spans in a
Thread::Queue(max size configurable viaDSPY_TELEMETRY_QUEUE_SIZE). - Drains spans in batches of
DSPY_TELEMETRY_BATCH_SIZE(default 100). When the queue reaches batch size, an immediate async export fires. - A background timer thread triggers periodic export every
DSPY_TELEMETRY_EXPORT_INTERVALseconds (default 60). - Applies exponential backoff (
0.1 * 2^attemptseconds) on export failures, up toDEFAULT_MAX_RETRIES(3). - On shutdown, flushes all remaining spans within
DSPY_TELEMETRY_SHUTDOWN_TIMEOUTseconds, then terminates the executor. - Drops the oldest span when the queue is full, logging
'observability.span_dropped'.
No application code interacts with the processor directly. Configure it entirely through environment variables.
Built-in Events Reference
| Event Name | Emitted By | Key Attributes |
|---|---|---|
lm.tokens |
DSPy::LM |
gen_ai.system, gen_ai.request.model, input_tokens, output_tokens, total_tokens |
chain_of_thought.reasoning_complete |
DSPy::ChainOfThought |
dspy.signature, cot.reasoning_steps, cot.reasoning_length, cot.has_reasoning |
react.iteration_complete |
DSPy::ReAct |
iteration, thought, action, observation |
codeact.iteration_complete |
dspy-code_act gem |
iteration, code_executed, execution_result |
optimization.trial_complete |
Teleprompters (MIPROv2) | trial_number, score |
score.create |
DSPy.score |
score_name, score_value, score_data_type, trace_id |
span.start |
DSPy::Context.with_span |
trace_id, span_id, parent_span_id, operation |
Best Practices
- Use dot-separated string names for events. Follow OpenTelemetry
gen_ai.*conventions for LLM attributes. - Always call
unsubscribe(orunsubscribe_module_eventsfor scoped subscriptions) when a tracker is no longer needed to prevent memory leaks. - Call
DSPy.events.clear_listenersin test teardown to avoid cross-contamination. - Wrap risky listener logic in a rescue block. The event system isolates listener failures, but explicit rescue prevents silent swallowing of domain errors.
- Prefer module-scoped
subscribefor agent internals. Reserve globalDSPy.events.subscribefor infrastructure-level concerns.