Files
claude-engineering-plugin/plugins/compound-engineering/skills/dspy-ruby/references/observability.md
Vicente Reig Rincón de Arellano e8f3bbcb35 refactor(skills): update dspy-ruby skill to DSPy.rb v0.34.3 API (#162)
Rewrite all reference files, asset templates, and SKILL.md to use
current API patterns (.call(), result.field, T::Enum classes,
Tools::Base). Add two new reference files (toolsets, observability)
covering tools DSL, event system, and Langfuse integration.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-09 12:01:43 -06:00

367 lines
13 KiB
Markdown

# DSPy.rb Observability
DSPy.rb provides an event-driven observability system built on OpenTelemetry. The system replaces monkey-patching with structured event emission, pluggable listeners, automatic span creation, and non-blocking Langfuse export.
## Event System
### Emitting Events
Emit structured events with `DSPy.event`:
```ruby
DSPy.event('lm.tokens', {
'gen_ai.system' => 'openai',
'gen_ai.request.model' => 'gpt-4',
input_tokens: 150,
output_tokens: 50,
total_tokens: 200
})
```
Event names are **strings** with dot-separated namespaces (e.g., `'llm.generate'`, `'react.iteration_complete'`, `'chain_of_thought.reasoning_complete'`). Do not use symbols for event names.
Attributes must be JSON-serializable. DSPy automatically merges context (trace ID, module stack) and creates OpenTelemetry spans.
### Global Subscriptions
Subscribe to events across the entire application with `DSPy.events.subscribe`:
```ruby
# Exact event name
subscription_id = DSPy.events.subscribe('lm.tokens') do |event_name, attrs|
puts "Tokens used: #{attrs[:total_tokens]}"
end
# Wildcard pattern -- matches llm.generate, llm.stream, etc.
DSPy.events.subscribe('llm.*') do |event_name, attrs|
track_llm_usage(attrs)
end
# Catch-all wildcard
DSPy.events.subscribe('*') do |event_name, attrs|
log_everything(event_name, attrs)
end
```
Use global subscriptions for cross-cutting concerns: observability exporters (Langfuse, Datadog), centralized logging, metrics collection.
### Module-Scoped Subscriptions
Declare listeners inside a `DSPy::Module` subclass. Subscriptions automatically scope to the module instance and its descendants:
```ruby
class ResearchReport < DSPy::Module
subscribe 'lm.tokens', :track_tokens, scope: :descendants
def initialize
super
@outliner = DSPy::Predict.new(OutlineSignature)
@writer = DSPy::Predict.new(SectionWriterSignature)
@token_count = 0
end
def forward(question:)
outline = @outliner.call(question: question)
outline.sections.map do |title|
draft = @writer.call(question: question, section_title: title)
{ title: title, body: draft.paragraph }
end
end
def track_tokens(_event, attrs)
@token_count += attrs.fetch(:total_tokens, 0)
end
end
```
The `scope:` parameter accepts:
- `:descendants` (default) -- receives events from the module **and** every nested module invoked inside it.
- `DSPy::Module::SubcriptionScope::SelfOnly` -- restricts delivery to events emitted by the module instance itself; ignores descendants.
Inspect active subscriptions with `registered_module_subscriptions`. Tear down with `unsubscribe_module_events`.
### Unsubscribe and Cleanup
Remove a global listener by subscription ID:
```ruby
id = DSPy.events.subscribe('llm.*') { |name, attrs| }
DSPy.events.unsubscribe(id)
```
Build tracker classes that manage their own subscription lifecycle:
```ruby
class TokenBudgetTracker
def initialize(budget:)
@budget = budget
@usage = 0
@subscriptions = []
@subscriptions << DSPy.events.subscribe('lm.tokens') do |_event, attrs|
@usage += attrs.fetch(:total_tokens, 0)
warn("Budget hit") if @usage >= @budget
end
end
def unsubscribe
@subscriptions.each { |id| DSPy.events.unsubscribe(id) }
@subscriptions.clear
end
end
```
### Clearing Listeners in Tests
Call `DSPy.events.clear_listeners` in `before`/`after` blocks to prevent cross-contamination between test cases:
```ruby
RSpec.configure do |config|
config.after(:each) { DSPy.events.clear_listeners }
end
```
## dspy-o11y Gems
Three gems compose the observability stack:
| Gem | Purpose |
|---|---|
| `dspy` | Core event bus (`DSPy.event`, `DSPy.events`) -- always available |
| `dspy-o11y` | OpenTelemetry spans, `AsyncSpanProcessor`, `DSPy::Context.with_span` helpers |
| `dspy-o11y-langfuse` | Langfuse adapter -- configures OTLP exporter targeting Langfuse endpoints |
### Installation
```ruby
# Gemfile
gem 'dspy'
gem 'dspy-o11y' # core spans + helpers
gem 'dspy-o11y-langfuse' # Langfuse/OpenTelemetry adapter (optional)
```
If the optional gems are absent, DSPy falls back to logging-only mode with no errors.
## Langfuse Integration
### Environment Variables
```bash
# Required
export LANGFUSE_PUBLIC_KEY=pk-lf-your-public-key
export LANGFUSE_SECRET_KEY=sk-lf-your-secret-key
# Optional (defaults to https://cloud.langfuse.com)
export LANGFUSE_HOST=https://us.cloud.langfuse.com
# Tuning (optional)
export DSPY_TELEMETRY_BATCH_SIZE=100 # spans per export batch (default 100)
export DSPY_TELEMETRY_QUEUE_SIZE=1000 # max queued spans (default 1000)
export DSPY_TELEMETRY_EXPORT_INTERVAL=60 # seconds between timed exports (default 60)
export DSPY_TELEMETRY_SHUTDOWN_TIMEOUT=10 # seconds to drain on shutdown (default 10)
```
### Automatic Configuration
Call `DSPy::Observability.configure!` once at boot (it is already called automatically when `require 'dspy'` runs and Langfuse env vars are present):
```ruby
require 'dspy'
# If LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY are set,
# DSPy::Observability.configure! runs automatically and:
# 1. Configures the OpenTelemetry SDK with an OTLP exporter
# 2. Creates dual output: structured logs AND OpenTelemetry spans
# 3. Exports spans to Langfuse using proper authentication
# 4. Falls back gracefully if gems are missing
```
Verify status with `DSPy::Observability.enabled?`.
### Automatic Tracing
With observability enabled, every `DSPy::Module#forward` call, LM request, and tool invocation creates properly nested spans. Langfuse receives hierarchical traces:
```
Trace: abc-123-def
+-- ChainOfThought.forward [2000ms] (observation type: chain)
+-- llm.generate [1000ms] (observation type: generation)
Model: gpt-4-0613
Tokens: 100 in / 50 out / 150 total
```
DSPy maps module classes to Langfuse observation types automatically via `DSPy::ObservationType.for_module_class`:
| Module | Observation Type |
|---|---|
| `DSPy::LM` (raw chat) | `generation` |
| `DSPy::ChainOfThought` | `chain` |
| `DSPy::ReAct` | `agent` |
| Tool invocations | `tool` |
| Memory/retrieval | `retriever` |
| Embedding engines | `embedding` |
| Evaluation modules | `evaluator` |
| Generic operations | `span` |
## Score Reporting
### DSPy.score API
Report evaluation scores with `DSPy.score`:
```ruby
# Numeric (default)
DSPy.score('accuracy', 0.95)
# With comment
DSPy.score('relevance', 0.87, comment: 'High semantic similarity')
# Boolean
DSPy.score('is_valid', 1, data_type: DSPy::Scores::DataType::Boolean)
# Categorical
DSPy.score('sentiment', 'positive', data_type: DSPy::Scores::DataType::Categorical)
# Explicit trace binding
DSPy.score('accuracy', 0.95, trace_id: 'custom-trace-id')
```
Available data types: `DSPy::Scores::DataType::Numeric`, `::Boolean`, `::Categorical`.
### score.create Events
Every `DSPy.score` call emits a `'score.create'` event. Subscribe to react:
```ruby
DSPy.events.subscribe('score.create') do |event_name, attrs|
puts "#{attrs[:score_name]} = #{attrs[:score_value]}"
# Also available: attrs[:score_id], attrs[:score_data_type],
# attrs[:score_comment], attrs[:trace_id], attrs[:observation_id],
# attrs[:timestamp]
end
```
### Async Langfuse Export with DSPy::Scores::Exporter
Configure the exporter to send scores to Langfuse in the background:
```ruby
exporter = DSPy::Scores::Exporter.configure(
public_key: ENV['LANGFUSE_PUBLIC_KEY'],
secret_key: ENV['LANGFUSE_SECRET_KEY'],
host: 'https://cloud.langfuse.com'
)
# Scores are now exported automatically via a background Thread::Queue
DSPy.score('accuracy', 0.95)
# Shut down gracefully (waits up to 5 seconds by default)
exporter.shutdown
```
The exporter subscribes to `'score.create'` events internally, queues them for async processing, and retries with exponential backoff on failure.
### Automatic Export with DSPy::Evals
Pass `export_scores: true` to `DSPy::Evals` to export per-example scores and an aggregate batch score automatically:
```ruby
evaluator = DSPy::Evals.new(
program,
metric: my_metric,
export_scores: true,
score_name: 'qa_accuracy'
)
result = evaluator.evaluate(test_examples)
```
## DSPy::Context.with_span
Create manual spans for custom operations. Requires `dspy-o11y`.
```ruby
DSPy::Context.with_span(operation: 'custom.retrieval', 'retrieval.source' => 'pinecone') do |span|
results = pinecone_client.query(embedding)
span&.set_attribute('retrieval.count', results.size) if span
results
end
```
Pass semantic attributes as keyword arguments alongside `operation:`. The block receives an OpenTelemetry span object (or `nil` when observability is disabled). The span automatically nests under the current parent span and records `duration.ms`, `langfuse.observation.startTime`, and `langfuse.observation.endTime`.
Assign a Langfuse observation type to custom spans:
```ruby
DSPy::Context.with_span(
operation: 'evaluate.batch',
**DSPy::ObservationType::Evaluator.langfuse_attributes,
'batch.size' => examples.length
) do |span|
run_evaluation(examples)
end
```
Scores reported inside a `with_span` block automatically inherit the current trace context.
## Module Stack Metadata
When `DSPy::Module#forward` runs, the context layer maintains a module stack. Every event includes:
```ruby
{
module_path: [
{ id: "root_uuid", class: "DeepSearch", label: nil },
{ id: "planner_uuid", class: "DSPy::Predict", label: "planner" }
],
module_root: { id: "root_uuid", class: "DeepSearch", label: nil },
module_leaf: { id: "planner_uuid", class: "DSPy::Predict", label: "planner" },
module_scope: {
ancestry_token: "root_uuid>planner_uuid",
depth: 2
}
}
```
| Key | Meaning |
|---|---|
| `module_path` | Ordered array of `{id, class, label}` entries from root to leaf |
| `module_root` | The outermost module in the current call chain |
| `module_leaf` | The innermost (currently executing) module |
| `module_scope.ancestry_token` | Stable string of joined UUIDs representing the nesting path |
| `module_scope.depth` | Integer depth of the current module in the stack |
Labels are set via `module_scope_label=` on a module instance or derived automatically from named predictors. Use this metadata to power Langfuse filters, scoped metrics, or custom event routing.
## Dedicated Export Worker
The `DSPy::Observability::AsyncSpanProcessor` (from `dspy-o11y`) keeps telemetry export off the hot path:
- Runs on a `Concurrent::SingleThreadExecutor` -- LLM workflows never compete with OTLP networking.
- Buffers finished spans in a `Thread::Queue` (max size configurable via `DSPY_TELEMETRY_QUEUE_SIZE`).
- Drains spans in batches of `DSPY_TELEMETRY_BATCH_SIZE` (default 100). When the queue reaches batch size, an immediate async export fires.
- A background timer thread triggers periodic export every `DSPY_TELEMETRY_EXPORT_INTERVAL` seconds (default 60).
- Applies exponential backoff (`0.1 * 2^attempt` seconds) on export failures, up to `DEFAULT_MAX_RETRIES` (3).
- On shutdown, flushes all remaining spans within `DSPY_TELEMETRY_SHUTDOWN_TIMEOUT` seconds, then terminates the executor.
- Drops the oldest span when the queue is full, logging `'observability.span_dropped'`.
No application code interacts with the processor directly. Configure it entirely through environment variables.
## Built-in Events Reference
| Event Name | Emitted By | Key Attributes |
|---|---|---|
| `lm.tokens` | `DSPy::LM` | `gen_ai.system`, `gen_ai.request.model`, `input_tokens`, `output_tokens`, `total_tokens` |
| `chain_of_thought.reasoning_complete` | `DSPy::ChainOfThought` | `dspy.signature`, `cot.reasoning_steps`, `cot.reasoning_length`, `cot.has_reasoning` |
| `react.iteration_complete` | `DSPy::ReAct` | `iteration`, `thought`, `action`, `observation` |
| `codeact.iteration_complete` | `dspy-code_act` gem | `iteration`, `code_executed`, `execution_result` |
| `optimization.trial_complete` | Teleprompters (MIPROv2) | `trial_number`, `score` |
| `score.create` | `DSPy.score` | `score_name`, `score_value`, `score_data_type`, `trace_id` |
| `span.start` | `DSPy::Context.with_span` | `trace_id`, `span_id`, `parent_span_id`, `operation` |
## Best Practices
- Use dot-separated string names for events. Follow OpenTelemetry `gen_ai.*` conventions for LLM attributes.
- Always call `unsubscribe` (or `unsubscribe_module_events` for scoped subscriptions) when a tracker is no longer needed to prevent memory leaks.
- Call `DSPy.events.clear_listeners` in test teardown to avoid cross-contamination.
- Wrap risky listener logic in a rescue block. The event system isolates listener failures, but explicit rescue prevents silent swallowing of domain errors.
- Prefer module-scoped `subscribe` for agent internals. Reserve global `DSPy.events.subscribe` for infrastructure-level concerns.