Rewrite all reference files, asset templates, and SKILL.md to use current API patterns (.call(), result.field, T::Enum classes, Tools::Base). Add two new reference files (toolsets, observability) covering tools DSL, event system, and Langfuse integration. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
367 lines
13 KiB
Markdown
367 lines
13 KiB
Markdown
# DSPy.rb Observability
|
|
|
|
DSPy.rb provides an event-driven observability system built on OpenTelemetry. The system replaces monkey-patching with structured event emission, pluggable listeners, automatic span creation, and non-blocking Langfuse export.
|
|
|
|
## Event System
|
|
|
|
### Emitting Events
|
|
|
|
Emit structured events with `DSPy.event`:
|
|
|
|
```ruby
|
|
DSPy.event('lm.tokens', {
|
|
'gen_ai.system' => 'openai',
|
|
'gen_ai.request.model' => 'gpt-4',
|
|
input_tokens: 150,
|
|
output_tokens: 50,
|
|
total_tokens: 200
|
|
})
|
|
```
|
|
|
|
Event names are **strings** with dot-separated namespaces (e.g., `'llm.generate'`, `'react.iteration_complete'`, `'chain_of_thought.reasoning_complete'`). Do not use symbols for event names.
|
|
|
|
Attributes must be JSON-serializable. DSPy automatically merges context (trace ID, module stack) and creates OpenTelemetry spans.
|
|
|
|
### Global Subscriptions
|
|
|
|
Subscribe to events across the entire application with `DSPy.events.subscribe`:
|
|
|
|
```ruby
|
|
# Exact event name
|
|
subscription_id = DSPy.events.subscribe('lm.tokens') do |event_name, attrs|
|
|
puts "Tokens used: #{attrs[:total_tokens]}"
|
|
end
|
|
|
|
# Wildcard pattern -- matches llm.generate, llm.stream, etc.
|
|
DSPy.events.subscribe('llm.*') do |event_name, attrs|
|
|
track_llm_usage(attrs)
|
|
end
|
|
|
|
# Catch-all wildcard
|
|
DSPy.events.subscribe('*') do |event_name, attrs|
|
|
log_everything(event_name, attrs)
|
|
end
|
|
```
|
|
|
|
Use global subscriptions for cross-cutting concerns: observability exporters (Langfuse, Datadog), centralized logging, metrics collection.
|
|
|
|
### Module-Scoped Subscriptions
|
|
|
|
Declare listeners inside a `DSPy::Module` subclass. Subscriptions automatically scope to the module instance and its descendants:
|
|
|
|
```ruby
|
|
class ResearchReport < DSPy::Module
|
|
subscribe 'lm.tokens', :track_tokens, scope: :descendants
|
|
|
|
def initialize
|
|
super
|
|
@outliner = DSPy::Predict.new(OutlineSignature)
|
|
@writer = DSPy::Predict.new(SectionWriterSignature)
|
|
@token_count = 0
|
|
end
|
|
|
|
def forward(question:)
|
|
outline = @outliner.call(question: question)
|
|
outline.sections.map do |title|
|
|
draft = @writer.call(question: question, section_title: title)
|
|
{ title: title, body: draft.paragraph }
|
|
end
|
|
end
|
|
|
|
def track_tokens(_event, attrs)
|
|
@token_count += attrs.fetch(:total_tokens, 0)
|
|
end
|
|
end
|
|
```
|
|
|
|
The `scope:` parameter accepts:
|
|
- `:descendants` (default) -- receives events from the module **and** every nested module invoked inside it.
|
|
- `DSPy::Module::SubcriptionScope::SelfOnly` -- restricts delivery to events emitted by the module instance itself; ignores descendants.
|
|
|
|
Inspect active subscriptions with `registered_module_subscriptions`. Tear down with `unsubscribe_module_events`.
|
|
|
|
### Unsubscribe and Cleanup
|
|
|
|
Remove a global listener by subscription ID:
|
|
|
|
```ruby
|
|
id = DSPy.events.subscribe('llm.*') { |name, attrs| }
|
|
DSPy.events.unsubscribe(id)
|
|
```
|
|
|
|
Build tracker classes that manage their own subscription lifecycle:
|
|
|
|
```ruby
|
|
class TokenBudgetTracker
|
|
def initialize(budget:)
|
|
@budget = budget
|
|
@usage = 0
|
|
@subscriptions = []
|
|
@subscriptions << DSPy.events.subscribe('lm.tokens') do |_event, attrs|
|
|
@usage += attrs.fetch(:total_tokens, 0)
|
|
warn("Budget hit") if @usage >= @budget
|
|
end
|
|
end
|
|
|
|
def unsubscribe
|
|
@subscriptions.each { |id| DSPy.events.unsubscribe(id) }
|
|
@subscriptions.clear
|
|
end
|
|
end
|
|
```
|
|
|
|
### Clearing Listeners in Tests
|
|
|
|
Call `DSPy.events.clear_listeners` in `before`/`after` blocks to prevent cross-contamination between test cases:
|
|
|
|
```ruby
|
|
RSpec.configure do |config|
|
|
config.after(:each) { DSPy.events.clear_listeners }
|
|
end
|
|
```
|
|
|
|
## dspy-o11y Gems
|
|
|
|
Three gems compose the observability stack:
|
|
|
|
| Gem | Purpose |
|
|
|---|---|
|
|
| `dspy` | Core event bus (`DSPy.event`, `DSPy.events`) -- always available |
|
|
| `dspy-o11y` | OpenTelemetry spans, `AsyncSpanProcessor`, `DSPy::Context.with_span` helpers |
|
|
| `dspy-o11y-langfuse` | Langfuse adapter -- configures OTLP exporter targeting Langfuse endpoints |
|
|
|
|
### Installation
|
|
|
|
```ruby
|
|
# Gemfile
|
|
gem 'dspy'
|
|
gem 'dspy-o11y' # core spans + helpers
|
|
gem 'dspy-o11y-langfuse' # Langfuse/OpenTelemetry adapter (optional)
|
|
```
|
|
|
|
If the optional gems are absent, DSPy falls back to logging-only mode with no errors.
|
|
|
|
## Langfuse Integration
|
|
|
|
### Environment Variables
|
|
|
|
```bash
|
|
# Required
|
|
export LANGFUSE_PUBLIC_KEY=pk-lf-your-public-key
|
|
export LANGFUSE_SECRET_KEY=sk-lf-your-secret-key
|
|
|
|
# Optional (defaults to https://cloud.langfuse.com)
|
|
export LANGFUSE_HOST=https://us.cloud.langfuse.com
|
|
|
|
# Tuning (optional)
|
|
export DSPY_TELEMETRY_BATCH_SIZE=100 # spans per export batch (default 100)
|
|
export DSPY_TELEMETRY_QUEUE_SIZE=1000 # max queued spans (default 1000)
|
|
export DSPY_TELEMETRY_EXPORT_INTERVAL=60 # seconds between timed exports (default 60)
|
|
export DSPY_TELEMETRY_SHUTDOWN_TIMEOUT=10 # seconds to drain on shutdown (default 10)
|
|
```
|
|
|
|
### Automatic Configuration
|
|
|
|
Call `DSPy::Observability.configure!` once at boot (it is already called automatically when `require 'dspy'` runs and Langfuse env vars are present):
|
|
|
|
```ruby
|
|
require 'dspy'
|
|
# If LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY are set,
|
|
# DSPy::Observability.configure! runs automatically and:
|
|
# 1. Configures the OpenTelemetry SDK with an OTLP exporter
|
|
# 2. Creates dual output: structured logs AND OpenTelemetry spans
|
|
# 3. Exports spans to Langfuse using proper authentication
|
|
# 4. Falls back gracefully if gems are missing
|
|
```
|
|
|
|
Verify status with `DSPy::Observability.enabled?`.
|
|
|
|
### Automatic Tracing
|
|
|
|
With observability enabled, every `DSPy::Module#forward` call, LM request, and tool invocation creates properly nested spans. Langfuse receives hierarchical traces:
|
|
|
|
```
|
|
Trace: abc-123-def
|
|
+-- ChainOfThought.forward [2000ms] (observation type: chain)
|
|
+-- llm.generate [1000ms] (observation type: generation)
|
|
Model: gpt-4-0613
|
|
Tokens: 100 in / 50 out / 150 total
|
|
```
|
|
|
|
DSPy maps module classes to Langfuse observation types automatically via `DSPy::ObservationType.for_module_class`:
|
|
|
|
| Module | Observation Type |
|
|
|---|---|
|
|
| `DSPy::LM` (raw chat) | `generation` |
|
|
| `DSPy::ChainOfThought` | `chain` |
|
|
| `DSPy::ReAct` | `agent` |
|
|
| Tool invocations | `tool` |
|
|
| Memory/retrieval | `retriever` |
|
|
| Embedding engines | `embedding` |
|
|
| Evaluation modules | `evaluator` |
|
|
| Generic operations | `span` |
|
|
|
|
## Score Reporting
|
|
|
|
### DSPy.score API
|
|
|
|
Report evaluation scores with `DSPy.score`:
|
|
|
|
```ruby
|
|
# Numeric (default)
|
|
DSPy.score('accuracy', 0.95)
|
|
|
|
# With comment
|
|
DSPy.score('relevance', 0.87, comment: 'High semantic similarity')
|
|
|
|
# Boolean
|
|
DSPy.score('is_valid', 1, data_type: DSPy::Scores::DataType::Boolean)
|
|
|
|
# Categorical
|
|
DSPy.score('sentiment', 'positive', data_type: DSPy::Scores::DataType::Categorical)
|
|
|
|
# Explicit trace binding
|
|
DSPy.score('accuracy', 0.95, trace_id: 'custom-trace-id')
|
|
```
|
|
|
|
Available data types: `DSPy::Scores::DataType::Numeric`, `::Boolean`, `::Categorical`.
|
|
|
|
### score.create Events
|
|
|
|
Every `DSPy.score` call emits a `'score.create'` event. Subscribe to react:
|
|
|
|
```ruby
|
|
DSPy.events.subscribe('score.create') do |event_name, attrs|
|
|
puts "#{attrs[:score_name]} = #{attrs[:score_value]}"
|
|
# Also available: attrs[:score_id], attrs[:score_data_type],
|
|
# attrs[:score_comment], attrs[:trace_id], attrs[:observation_id],
|
|
# attrs[:timestamp]
|
|
end
|
|
```
|
|
|
|
### Async Langfuse Export with DSPy::Scores::Exporter
|
|
|
|
Configure the exporter to send scores to Langfuse in the background:
|
|
|
|
```ruby
|
|
exporter = DSPy::Scores::Exporter.configure(
|
|
public_key: ENV['LANGFUSE_PUBLIC_KEY'],
|
|
secret_key: ENV['LANGFUSE_SECRET_KEY'],
|
|
host: 'https://cloud.langfuse.com'
|
|
)
|
|
|
|
# Scores are now exported automatically via a background Thread::Queue
|
|
DSPy.score('accuracy', 0.95)
|
|
|
|
# Shut down gracefully (waits up to 5 seconds by default)
|
|
exporter.shutdown
|
|
```
|
|
|
|
The exporter subscribes to `'score.create'` events internally, queues them for async processing, and retries with exponential backoff on failure.
|
|
|
|
### Automatic Export with DSPy::Evals
|
|
|
|
Pass `export_scores: true` to `DSPy::Evals` to export per-example scores and an aggregate batch score automatically:
|
|
|
|
```ruby
|
|
evaluator = DSPy::Evals.new(
|
|
program,
|
|
metric: my_metric,
|
|
export_scores: true,
|
|
score_name: 'qa_accuracy'
|
|
)
|
|
|
|
result = evaluator.evaluate(test_examples)
|
|
```
|
|
|
|
## DSPy::Context.with_span
|
|
|
|
Create manual spans for custom operations. Requires `dspy-o11y`.
|
|
|
|
```ruby
|
|
DSPy::Context.with_span(operation: 'custom.retrieval', 'retrieval.source' => 'pinecone') do |span|
|
|
results = pinecone_client.query(embedding)
|
|
span&.set_attribute('retrieval.count', results.size) if span
|
|
results
|
|
end
|
|
```
|
|
|
|
Pass semantic attributes as keyword arguments alongside `operation:`. The block receives an OpenTelemetry span object (or `nil` when observability is disabled). The span automatically nests under the current parent span and records `duration.ms`, `langfuse.observation.startTime`, and `langfuse.observation.endTime`.
|
|
|
|
Assign a Langfuse observation type to custom spans:
|
|
|
|
```ruby
|
|
DSPy::Context.with_span(
|
|
operation: 'evaluate.batch',
|
|
**DSPy::ObservationType::Evaluator.langfuse_attributes,
|
|
'batch.size' => examples.length
|
|
) do |span|
|
|
run_evaluation(examples)
|
|
end
|
|
```
|
|
|
|
Scores reported inside a `with_span` block automatically inherit the current trace context.
|
|
|
|
## Module Stack Metadata
|
|
|
|
When `DSPy::Module#forward` runs, the context layer maintains a module stack. Every event includes:
|
|
|
|
```ruby
|
|
{
|
|
module_path: [
|
|
{ id: "root_uuid", class: "DeepSearch", label: nil },
|
|
{ id: "planner_uuid", class: "DSPy::Predict", label: "planner" }
|
|
],
|
|
module_root: { id: "root_uuid", class: "DeepSearch", label: nil },
|
|
module_leaf: { id: "planner_uuid", class: "DSPy::Predict", label: "planner" },
|
|
module_scope: {
|
|
ancestry_token: "root_uuid>planner_uuid",
|
|
depth: 2
|
|
}
|
|
}
|
|
```
|
|
|
|
| Key | Meaning |
|
|
|---|---|
|
|
| `module_path` | Ordered array of `{id, class, label}` entries from root to leaf |
|
|
| `module_root` | The outermost module in the current call chain |
|
|
| `module_leaf` | The innermost (currently executing) module |
|
|
| `module_scope.ancestry_token` | Stable string of joined UUIDs representing the nesting path |
|
|
| `module_scope.depth` | Integer depth of the current module in the stack |
|
|
|
|
Labels are set via `module_scope_label=` on a module instance or derived automatically from named predictors. Use this metadata to power Langfuse filters, scoped metrics, or custom event routing.
|
|
|
|
## Dedicated Export Worker
|
|
|
|
The `DSPy::Observability::AsyncSpanProcessor` (from `dspy-o11y`) keeps telemetry export off the hot path:
|
|
|
|
- Runs on a `Concurrent::SingleThreadExecutor` -- LLM workflows never compete with OTLP networking.
|
|
- Buffers finished spans in a `Thread::Queue` (max size configurable via `DSPY_TELEMETRY_QUEUE_SIZE`).
|
|
- Drains spans in batches of `DSPY_TELEMETRY_BATCH_SIZE` (default 100). When the queue reaches batch size, an immediate async export fires.
|
|
- A background timer thread triggers periodic export every `DSPY_TELEMETRY_EXPORT_INTERVAL` seconds (default 60).
|
|
- Applies exponential backoff (`0.1 * 2^attempt` seconds) on export failures, up to `DEFAULT_MAX_RETRIES` (3).
|
|
- On shutdown, flushes all remaining spans within `DSPY_TELEMETRY_SHUTDOWN_TIMEOUT` seconds, then terminates the executor.
|
|
- Drops the oldest span when the queue is full, logging `'observability.span_dropped'`.
|
|
|
|
No application code interacts with the processor directly. Configure it entirely through environment variables.
|
|
|
|
## Built-in Events Reference
|
|
|
|
| Event Name | Emitted By | Key Attributes |
|
|
|---|---|---|
|
|
| `lm.tokens` | `DSPy::LM` | `gen_ai.system`, `gen_ai.request.model`, `input_tokens`, `output_tokens`, `total_tokens` |
|
|
| `chain_of_thought.reasoning_complete` | `DSPy::ChainOfThought` | `dspy.signature`, `cot.reasoning_steps`, `cot.reasoning_length`, `cot.has_reasoning` |
|
|
| `react.iteration_complete` | `DSPy::ReAct` | `iteration`, `thought`, `action`, `observation` |
|
|
| `codeact.iteration_complete` | `dspy-code_act` gem | `iteration`, `code_executed`, `execution_result` |
|
|
| `optimization.trial_complete` | Teleprompters (MIPROv2) | `trial_number`, `score` |
|
|
| `score.create` | `DSPy.score` | `score_name`, `score_value`, `score_data_type`, `trace_id` |
|
|
| `span.start` | `DSPy::Context.with_span` | `trace_id`, `span_id`, `parent_span_id`, `operation` |
|
|
|
|
## Best Practices
|
|
|
|
- Use dot-separated string names for events. Follow OpenTelemetry `gen_ai.*` conventions for LLM attributes.
|
|
- Always call `unsubscribe` (or `unsubscribe_module_events` for scoped subscriptions) when a tracker is no longer needed to prevent memory leaks.
|
|
- Call `DSPy.events.clear_listeners` in test teardown to avoid cross-contamination.
|
|
- Wrap risky listener logic in a rescue block. The event system isolates listener failures, but explicit rescue prevents silent swallowing of domain errors.
|
|
- Prefer module-scoped `subscribe` for agent internals. Reserve global `DSPy.events.subscribe` for infrastructure-level concerns.
|