Files
claude-engineering-plugin/plugins/compound-engineering/skills/dspy-ruby/references/observability.md
Vicente Reig Rincón de Arellano e8f3bbcb35 refactor(skills): update dspy-ruby skill to DSPy.rb v0.34.3 API (#162)
Rewrite all reference files, asset templates, and SKILL.md to use
current API patterns (.call(), result.field, T::Enum classes,
Tools::Base). Add two new reference files (toolsets, observability)
covering tools DSL, event system, and Langfuse integration.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-09 12:01:43 -06:00

13 KiB

DSPy.rb Observability

DSPy.rb provides an event-driven observability system built on OpenTelemetry. The system replaces monkey-patching with structured event emission, pluggable listeners, automatic span creation, and non-blocking Langfuse export.

Event System

Emitting Events

Emit structured events with DSPy.event:

DSPy.event('lm.tokens', {
  'gen_ai.system' => 'openai',
  'gen_ai.request.model' => 'gpt-4',
  input_tokens: 150,
  output_tokens: 50,
  total_tokens: 200
})

Event names are strings with dot-separated namespaces (e.g., 'llm.generate', 'react.iteration_complete', 'chain_of_thought.reasoning_complete'). Do not use symbols for event names.

Attributes must be JSON-serializable. DSPy automatically merges context (trace ID, module stack) and creates OpenTelemetry spans.

Global Subscriptions

Subscribe to events across the entire application with DSPy.events.subscribe:

# Exact event name
subscription_id = DSPy.events.subscribe('lm.tokens') do |event_name, attrs|
  puts "Tokens used: #{attrs[:total_tokens]}"
end

# Wildcard pattern -- matches llm.generate, llm.stream, etc.
DSPy.events.subscribe('llm.*') do |event_name, attrs|
  track_llm_usage(attrs)
end

# Catch-all wildcard
DSPy.events.subscribe('*') do |event_name, attrs|
  log_everything(event_name, attrs)
end

Use global subscriptions for cross-cutting concerns: observability exporters (Langfuse, Datadog), centralized logging, metrics collection.

Module-Scoped Subscriptions

Declare listeners inside a DSPy::Module subclass. Subscriptions automatically scope to the module instance and its descendants:

class ResearchReport < DSPy::Module
  subscribe 'lm.tokens', :track_tokens, scope: :descendants

  def initialize
    super
    @outliner = DSPy::Predict.new(OutlineSignature)
    @writer   = DSPy::Predict.new(SectionWriterSignature)
    @token_count = 0
  end

  def forward(question:)
    outline = @outliner.call(question: question)
    outline.sections.map do |title|
      draft = @writer.call(question: question, section_title: title)
      { title: title, body: draft.paragraph }
    end
  end

  def track_tokens(_event, attrs)
    @token_count += attrs.fetch(:total_tokens, 0)
  end
end

The scope: parameter accepts:

  • :descendants (default) -- receives events from the module and every nested module invoked inside it.
  • DSPy::Module::SubcriptionScope::SelfOnly -- restricts delivery to events emitted by the module instance itself; ignores descendants.

Inspect active subscriptions with registered_module_subscriptions. Tear down with unsubscribe_module_events.

Unsubscribe and Cleanup

Remove a global listener by subscription ID:

id = DSPy.events.subscribe('llm.*') { |name, attrs| }
DSPy.events.unsubscribe(id)

Build tracker classes that manage their own subscription lifecycle:

class TokenBudgetTracker
  def initialize(budget:)
    @budget = budget
    @usage  = 0
    @subscriptions = []
    @subscriptions << DSPy.events.subscribe('lm.tokens') do |_event, attrs|
      @usage += attrs.fetch(:total_tokens, 0)
      warn("Budget hit") if @usage >= @budget
    end
  end

  def unsubscribe
    @subscriptions.each { |id| DSPy.events.unsubscribe(id) }
    @subscriptions.clear
  end
end

Clearing Listeners in Tests

Call DSPy.events.clear_listeners in before/after blocks to prevent cross-contamination between test cases:

RSpec.configure do |config|
  config.after(:each) { DSPy.events.clear_listeners }
end

dspy-o11y Gems

Three gems compose the observability stack:

Gem Purpose
dspy Core event bus (DSPy.event, DSPy.events) -- always available
dspy-o11y OpenTelemetry spans, AsyncSpanProcessor, DSPy::Context.with_span helpers
dspy-o11y-langfuse Langfuse adapter -- configures OTLP exporter targeting Langfuse endpoints

Installation

# Gemfile
gem 'dspy'
gem 'dspy-o11y'           # core spans + helpers
gem 'dspy-o11y-langfuse'  # Langfuse/OpenTelemetry adapter (optional)

If the optional gems are absent, DSPy falls back to logging-only mode with no errors.

Langfuse Integration

Environment Variables

# Required
export LANGFUSE_PUBLIC_KEY=pk-lf-your-public-key
export LANGFUSE_SECRET_KEY=sk-lf-your-secret-key

# Optional (defaults to https://cloud.langfuse.com)
export LANGFUSE_HOST=https://us.cloud.langfuse.com

# Tuning (optional)
export DSPY_TELEMETRY_BATCH_SIZE=100        # spans per export batch (default 100)
export DSPY_TELEMETRY_QUEUE_SIZE=1000       # max queued spans (default 1000)
export DSPY_TELEMETRY_EXPORT_INTERVAL=60    # seconds between timed exports (default 60)
export DSPY_TELEMETRY_SHUTDOWN_TIMEOUT=10   # seconds to drain on shutdown (default 10)

Automatic Configuration

Call DSPy::Observability.configure! once at boot (it is already called automatically when require 'dspy' runs and Langfuse env vars are present):

require 'dspy'
# If LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY are set,
# DSPy::Observability.configure! runs automatically and:
#   1. Configures the OpenTelemetry SDK with an OTLP exporter
#   2. Creates dual output: structured logs AND OpenTelemetry spans
#   3. Exports spans to Langfuse using proper authentication
#   4. Falls back gracefully if gems are missing

Verify status with DSPy::Observability.enabled?.

Automatic Tracing

With observability enabled, every DSPy::Module#forward call, LM request, and tool invocation creates properly nested spans. Langfuse receives hierarchical traces:

Trace: abc-123-def
+-- ChainOfThought.forward [2000ms]  (observation type: chain)
    +-- llm.generate [1000ms]        (observation type: generation)
        Model: gpt-4-0613
        Tokens: 100 in / 50 out / 150 total

DSPy maps module classes to Langfuse observation types automatically via DSPy::ObservationType.for_module_class:

Module Observation Type
DSPy::LM (raw chat) generation
DSPy::ChainOfThought chain
DSPy::ReAct agent
Tool invocations tool
Memory/retrieval retriever
Embedding engines embedding
Evaluation modules evaluator
Generic operations span

Score Reporting

DSPy.score API

Report evaluation scores with DSPy.score:

# Numeric (default)
DSPy.score('accuracy', 0.95)

# With comment
DSPy.score('relevance', 0.87, comment: 'High semantic similarity')

# Boolean
DSPy.score('is_valid', 1, data_type: DSPy::Scores::DataType::Boolean)

# Categorical
DSPy.score('sentiment', 'positive', data_type: DSPy::Scores::DataType::Categorical)

# Explicit trace binding
DSPy.score('accuracy', 0.95, trace_id: 'custom-trace-id')

Available data types: DSPy::Scores::DataType::Numeric, ::Boolean, ::Categorical.

score.create Events

Every DSPy.score call emits a 'score.create' event. Subscribe to react:

DSPy.events.subscribe('score.create') do |event_name, attrs|
  puts "#{attrs[:score_name]} = #{attrs[:score_value]}"
  # Also available: attrs[:score_id], attrs[:score_data_type],
  # attrs[:score_comment], attrs[:trace_id], attrs[:observation_id],
  # attrs[:timestamp]
end

Async Langfuse Export with DSPy::Scores::Exporter

Configure the exporter to send scores to Langfuse in the background:

exporter = DSPy::Scores::Exporter.configure(
  public_key: ENV['LANGFUSE_PUBLIC_KEY'],
  secret_key: ENV['LANGFUSE_SECRET_KEY'],
  host: 'https://cloud.langfuse.com'
)

# Scores are now exported automatically via a background Thread::Queue
DSPy.score('accuracy', 0.95)

# Shut down gracefully (waits up to 5 seconds by default)
exporter.shutdown

The exporter subscribes to 'score.create' events internally, queues them for async processing, and retries with exponential backoff on failure.

Automatic Export with DSPy::Evals

Pass export_scores: true to DSPy::Evals to export per-example scores and an aggregate batch score automatically:

evaluator = DSPy::Evals.new(
  program,
  metric: my_metric,
  export_scores: true,
  score_name: 'qa_accuracy'
)

result = evaluator.evaluate(test_examples)

DSPy::Context.with_span

Create manual spans for custom operations. Requires dspy-o11y.

DSPy::Context.with_span(operation: 'custom.retrieval', 'retrieval.source' => 'pinecone') do |span|
  results = pinecone_client.query(embedding)
  span&.set_attribute('retrieval.count', results.size) if span
  results
end

Pass semantic attributes as keyword arguments alongside operation:. The block receives an OpenTelemetry span object (or nil when observability is disabled). The span automatically nests under the current parent span and records duration.ms, langfuse.observation.startTime, and langfuse.observation.endTime.

Assign a Langfuse observation type to custom spans:

DSPy::Context.with_span(
  operation: 'evaluate.batch',
  **DSPy::ObservationType::Evaluator.langfuse_attributes,
  'batch.size' => examples.length
) do |span|
  run_evaluation(examples)
end

Scores reported inside a with_span block automatically inherit the current trace context.

Module Stack Metadata

When DSPy::Module#forward runs, the context layer maintains a module stack. Every event includes:

{
  module_path: [
    { id: "root_uuid",    class: "DeepSearch",    label: nil },
    { id: "planner_uuid", class: "DSPy::Predict", label: "planner" }
  ],
  module_root: { id: "root_uuid", class: "DeepSearch", label: nil },
  module_leaf: { id: "planner_uuid", class: "DSPy::Predict", label: "planner" },
  module_scope: {
    ancestry_token: "root_uuid>planner_uuid",
    depth: 2
  }
}
Key Meaning
module_path Ordered array of {id, class, label} entries from root to leaf
module_root The outermost module in the current call chain
module_leaf The innermost (currently executing) module
module_scope.ancestry_token Stable string of joined UUIDs representing the nesting path
module_scope.depth Integer depth of the current module in the stack

Labels are set via module_scope_label= on a module instance or derived automatically from named predictors. Use this metadata to power Langfuse filters, scoped metrics, or custom event routing.

Dedicated Export Worker

The DSPy::Observability::AsyncSpanProcessor (from dspy-o11y) keeps telemetry export off the hot path:

  • Runs on a Concurrent::SingleThreadExecutor -- LLM workflows never compete with OTLP networking.
  • Buffers finished spans in a Thread::Queue (max size configurable via DSPY_TELEMETRY_QUEUE_SIZE).
  • Drains spans in batches of DSPY_TELEMETRY_BATCH_SIZE (default 100). When the queue reaches batch size, an immediate async export fires.
  • A background timer thread triggers periodic export every DSPY_TELEMETRY_EXPORT_INTERVAL seconds (default 60).
  • Applies exponential backoff (0.1 * 2^attempt seconds) on export failures, up to DEFAULT_MAX_RETRIES (3).
  • On shutdown, flushes all remaining spans within DSPY_TELEMETRY_SHUTDOWN_TIMEOUT seconds, then terminates the executor.
  • Drops the oldest span when the queue is full, logging 'observability.span_dropped'.

No application code interacts with the processor directly. Configure it entirely through environment variables.

Built-in Events Reference

Event Name Emitted By Key Attributes
lm.tokens DSPy::LM gen_ai.system, gen_ai.request.model, input_tokens, output_tokens, total_tokens
chain_of_thought.reasoning_complete DSPy::ChainOfThought dspy.signature, cot.reasoning_steps, cot.reasoning_length, cot.has_reasoning
react.iteration_complete DSPy::ReAct iteration, thought, action, observation
codeact.iteration_complete dspy-code_act gem iteration, code_executed, execution_result
optimization.trial_complete Teleprompters (MIPROv2) trial_number, score
score.create DSPy.score score_name, score_value, score_data_type, trace_id
span.start DSPy::Context.with_span trace_id, span_id, parent_span_id, operation

Best Practices

  • Use dot-separated string names for events. Follow OpenTelemetry gen_ai.* conventions for LLM attributes.
  • Always call unsubscribe (or unsubscribe_module_events for scoped subscriptions) when a tracker is no longer needed to prevent memory leaks.
  • Call DSPy.events.clear_listeners in test teardown to avoid cross-contamination.
  • Wrap risky listener logic in a rescue block. The event system isolates listener failures, but explicit rescue prevents silent swallowing of domain errors.
  • Prefer module-scoped subscribe for agent internals. Reserve global DSPy.events.subscribe for infrastructure-level concerns.