Skip to main content

Observability

ANIP provides callback-based hooks for logging, metrics, tracing, and diagnostics. Hooks are optional and zero-overhead when absent — the runtime only calls hooks that are registered. Hook callbacks are isolated from correctness paths: a throwing hook never affects requests or background workers.

Hook categories

CategoryHooksPurpose
Logging8 hooksStructured log events for invocations, delegation, audit, checkpoints
Metrics10 hooksCounters and durations for monitoring dashboards
Tracing2 hooksSpan lifecycle for distributed tracing (OpenTelemetry, Jaeger, etc.)
Diagnostics1 hookBackground worker error reporting

Quick example

from anip_service import ANIPService, ANIPHooks, LoggingHooks, MetricsHooks, TracingHooks

hooks = ANIPHooks(
logging=LoggingHooks(
on_invocation_start=lambda info: print(
f"[ANIP] invoke-start capability={info['capability']} subject={info.get('subject')}"
),
on_invocation_end=lambda info: print(
f"[ANIP] invoke-end capability={info['capability']} "
f"success={info['success']} duration_ms={info.get('duration_ms')}"
),
on_delegation_failure=lambda info: print(
f"[ANIP] delegation-fail reason={info.get('reason')}"
),
),
metrics=MetricsHooks(
on_invocation_duration=lambda info: statsd.timing(
"anip.invoke.duration_ms", info["duration_ms"],
tags=[f"capability:{info['capability']}", f"success:{info['success']}"],
),
on_delegation_denied=lambda info: statsd.increment(
"anip.delegation.denied", tags=[f"reason:{info.get('reason')}"],
),
),
)

service = ANIPService(
service_id="my-service",
capabilities=[...],
hooks=hooks,
authenticate=...,
)

Logging hooks

All logging hooks receive a dict/map/object with context-specific fields.

HookWhen it firesKey fields
on_invocation_startBefore handler runscapability, subject, scope, invocation_id
on_invocation_endAfter handler completescapability, success, duration_ms, invocation_id
on_delegation_failureToken validation failsreason, subject
on_audit_appendAudit entry writtencapability, event_class, invocation_id
on_checkpoint_createdMerkle checkpoint builtcheckpoint_id, entry_count, merkle_root
on_retention_sweepOld entries purgeddeleted_count
on_aggregation_flushAggregated entries flushedflushed_count
on_streaming_summaryStreaming invocation completedcapability, chunk_count, duration_ms

Metrics hooks

HookWhen it firesKey fields
on_invocation_durationAfter each invocationcapability, success, duration_ms
on_delegation_deniedDelegation check failsreason
on_audit_append_durationAudit write completesduration_ms
on_checkpoint_createdCheckpoint builtentry_count, duration_ms
on_checkpoint_failedCheckpoint build failserror
on_proof_generatedMerkle proof builtduration_ms
on_proof_unavailableProof cannot be generatedreason
on_retention_deletedEntries purgeddeleted_count
on_aggregation_flushedAggregated entries flushedflushed_count
on_streaming_delivery_failureSSE delivery failscapability, error

Tracing hooks

ANIP defines 8 stable span names for distributed tracing integration:

Span nameTypeDescription
anip.invokeRequestTop-level invocation span
anip.delegation.validateRequestToken + scope validation
anip.handler.executeRequestCapability handler execution
anip.audit.appendRequestAudit entry write
anip.checkpoint.createBackgroundMerkle checkpoint generation
anip.proof.generateRequestInclusion proof generation
anip.retention.sweepBackgroundAudit retention enforcement
anip.aggregation.flushBackgroundAggregated entry flush

Request-path spans nest under anip.invoke. Background spans are root spans.

OpenTelemetry integration

from opentelemetry import trace

tracer = trace.get_tracer("anip")

hooks = ANIPHooks(
tracing=TracingHooks(
start_span=lambda info: tracer.start_span(
info["span_name"],
attributes={k: str(v) for k, v in info.get("attributes", {}).items()},
),
end_span=lambda info: info["span"].end(),
),
)

Health endpoint

The runtime provides a getHealth() method that returns a cached snapshot of storage, checkpoint, retention, and aggregation state. Framework adapters expose this as GET /-/health:

curl http://localhost:9100/-/health
{
"status": "healthy",
"storage": { "type": "sqlite", "connected": true },
"checkpoint": { "last_sequence": 42, "last_created_at": "2026-03-28T10:00:00Z" },
"retention": { "last_sweep_at": "2026-03-28T09:55:00Z", "deleted_count": 0 },
"aggregation": { "pending_count": 0 }
}

Enable the health endpoint when mounting:

mount_anip(app, service, health_endpoint=True)

Diagnostics hook

The diagnostics hook catches errors from background workers (checkpoint scheduler, retention sweeper, aggregation flusher) that would otherwise be silently swallowed:

hooks = ANIPHooks(
diagnostics=DiagnosticsHooks(
on_background_error=lambda info: sentry.capture_exception(
info.get("error"),
extra={"worker": info.get("worker"), "context": info.get("context")},
),
),
)

Hook isolation

Hooks are isolated from correctness paths. If a hook callback throws an exception:

  • The request or background operation completes normally
  • The exception is logged (via diagnostics hook if registered, or stderr)
  • No data is lost or corrupted

This means you can safely connect hooks to external systems (Datadog, Sentry, Prometheus, etc.) without risking service stability.

Next steps