Writing an Oracle
An oracle is the component that resolves conflicts between competing claims when the engine cannot resolve them deterministically. This is mempill’s primary differentiator: conflict resolution is pluggable, auditable, and human-overridable — not hidden inside a statistical model.
For the conceptual foundation see Concepts: Oracle Loop and Concepts: Human-in-the-Loop.
The oracle protocol (Python)
Section titled “The oracle protocol (Python)”The Python oracle is duck-typed: any Python object with a single method qualifies. No base class or import from mempill is required:
def request_adjudication(self, agent_id: str, request: dict) -> str: ...| Parameter | Type | Description |
|---|---|---|
agent_id |
str |
The agent whose claim triggered the conflict |
request |
dict |
Conflict context (subject, predicate, incumbent, challenger values) |
| Return | str |
A handle_id UUID string — the engine stores it in the durable pending queue |
The oracle returns a handle_id immediately. It does not decide the verdict at call
time. The verdict arrives later via engine.submit_adjudication(...). The engine owns the
durable queue; the oracle only generates the handle.
Wiring the oracle
Section titled “Wiring the oracle”import mempill
class MyOracle: def request_adjudication(self, agent_id: str, request: dict) -> str: import uuid # Log the request, push to a queue, etc. # Return a fresh handle_id — the engine records the rest. return str(uuid.uuid4())
oracle = MyOracle()
# Ephemeral in-memory engine with oracle:engine = mempill.open_oracle_in_memory(oracle)
# Or file-backed:engine = mempill.open_oracle("/path/to/agent.db", oracle)These constructors (from mempill/__init__.py) return OracleEngine (PyOracleEngine),
which exposes all the regular engine methods plus the three oracle-specific methods:
list_pending_adjudications, submit_adjudication, and sweep_expired_adjudications.
What happens when a conflict arrives
Section titled “What happens when a conflict arrives”- Agent ingests a claim that conflicts with an existing committed claim.
- The AdjudicationGate (C7) detects the conflict and calls
oracle.request_adjudication(agent_id, request). - The oracle returns a
handle_id. - The engine writes a
PendingAdjudicationRowto the durable pending store (survives restart). - The ingest response returns
disposition = "QueuedForAdjudication". query_memoryreturnsstatus = "Contested"with both values inalternatives.- A later call to
submit_adjudicationresolves the conflict.
Listing pending adjudications
Section titled “Listing pending adjudications”pending = engine.list_pending_adjudications(agent_id="my-agent")
for item in pending: print(item["handle_id"]) # UUID string print(item["subject"]) # e.g. "acme:ceo" print(item["predicate"]) # e.g. "held_by" print(item["incumbent_value"]) # the existing committed value print(item["challenger_value"]) # the new conflicting value print(item["queued_at"]) # ISO-8601 UTC timestampSubmitting a verdict
Section titled “Submitting a verdict”result = engine.submit_adjudication({ "handle_id": "45c186e5-...", # UUID from list_pending_adjudications "verdict": "Affirm", # "Affirm" | "Deny" | "Unknown" "evidence_provenance": mempill.ProvenanceLabel.external_first_hand(),})
print(result["disposition"]) # e.g. "CommittedCheap" or "Superseded"print(result["claim_ref"]) # UUID of the winning claimVerdicts
Section titled “Verdicts”| Verdict | Engine action |
|---|---|
"Affirm" |
Challenger wins — incumbent is superseded; challenger is committed |
"Deny" |
Incumbent stands — challenger is parked as PendingConflict |
"Unknown" |
Cannot decide — claim stays Contested; removed from queue |
The verdict is committed atomically with External(ExternalFirstHand) provenance — the
submit call is itself an authoritative, auditable event in the ledger.
Sweeping expired adjudications
Section titled “Sweeping expired adjudications”Pending adjudications have a TTL. Expired ones return to Contested automatically, but
you can trigger a sweep explicitly (e.g. on startup or via a scheduled job):
swept = engine.sweep_expired_adjudications()# Returns int (count swept) or dict with "swept" keyprint(f"Swept {swept} expired adjudication(s) back to Contested")Verified end-to-end oracle flow
Section titled “Verified end-to-end oracle flow”The following was run against the live engine (demo venv, mempill 0.2.0):
ingest Alice: CommittedCheap ref: 58525121ingest Bob: QueuedForAdjudication ref: 383a9987pending count: 1 handle: 45c186e5 incumbent: Alice challenger: Bobbelief before resolve: status= Contestedsubmit_adjudication: {'handle_id': '45c186e5-...', 'disposition': 'CommittedCheap', 'claim_ref': '383a9987-...'}belief after resolve: status= Resolved primary value: BobThe HumanOracle pattern
Section titled “The HumanOracle pattern”The demo implements the canonical human-in-the-loop oracle in
mempill-demo/src/mempill_demo/adapters/human_oracle.py:
import uuid
class HumanOracle: """ Stateless oracle that queues every adjudication request for human review.
The engine is the durable store — this class only generates a handle_id. """
def request_adjudication(self, agent_id: str, request: dict) -> str: """Return a fresh UUID handle_id string; the engine records the rest.""" return str(uuid.uuid4())The oracle itself is intentionally stateless. The entire conflict state (subject, predicate, incumbent value, challenger value, queued_at timestamp) is stored by the engine in the durable pending queue. The oracle does not need to remember anything between calls.
The demo then wires it:
import mempillfrom mempill_demo.adapters.human_oracle import HumanOracle
oracle = HumanOracle()engine = mempill.open_oracle_in_memory(oracle)# or: engine = mempill.open_oracle(db_path, oracle)And provides a /review REPL command that calls list_pending_adjudications and
submit_adjudication. The review flow (app/review.py) maps user choices to verdicts:
| User input | Verdict | Meaning |
|---|---|---|
c / challenger |
Affirm |
Challenger wins |
i / incumbent |
Deny |
Incumbent stands |
a / abstain |
Unknown |
Stays Contested; removed from queue |
s / skip |
(nothing submitted) | Defer — ask again next /review |
Sketch: API/authoritative-source oracle
Section titled “Sketch: API/authoritative-source oracle”An oracle that consults an authoritative external source automatically (no human required):
import uuidimport httpx
class ApiOracle: """Oracle that resolves CEO claims against a corporate registry API."""
def __init__(self, registry_url: str, api_key: str) -> None: self._url = registry_url self._api_key = api_key # Store pending requests in a local dict for fast lookup on submit. # In production use a durable store; this is a sketch. self._pending: dict[str, dict] = {}
def request_adjudication(self, agent_id: str, request: dict) -> str: handle_id = str(uuid.uuid4()) # Kick off a non-blocking lookup; the engine calls this synchronously. # In production, enqueue to a worker thread or async task. self._pending[handle_id] = request return handle_id
def resolve_pending(self, engine, agent_id: str) -> None: """Call this from a background worker to process pending requests.""" for handle_id, request in list(self._pending.items()): verdict = self._consult_registry(request) engine.submit_adjudication({ "handle_id": handle_id, "verdict": verdict, "evidence_provenance": {"type": "External", "kind": "ExternalFirstHand"}, }) del self._pending[handle_id]
def _consult_registry(self, request: dict) -> str: """Return "Affirm", "Deny", or "Unknown" based on registry lookup.""" challenger = request.get("challenger_value") resp = httpx.get( f"{self._url}/verify", params={"value": challenger}, headers={"Authorization": f"Bearer {self._api_key}"}, timeout=5.0, ) if resp.status_code == 200 and resp.json().get("verified"): return "Affirm" return "Unknown"This sketch illustrates the pattern: request_adjudication is fast and synchronous (just
returns a handle); the actual API call happens asynchronously and calls submit_adjudication
when it completes.
Rust OraclePort trait
Section titled “Rust OraclePort trait”For Rust callers, the oracle is OraclePort from mempill-core/src/ports/:
pub trait OraclePort: Send + Sync + 'static { fn request_adjudication( &self, agent_id: &AgentId, request: &AdjudicationRequest, ) -> Result<HandleId, OracleError>;}Use open_with_oracle(path, Arc::new(my_oracle))? (SQLite) or
open_postgres_with_oracle(conn_str, Arc::new(my_oracle), ...) (Postgres).
Key invariants
Section titled “Key invariants”- I5: The oracle is a proposal generator, not a committer. It returns a handle; the engine performs the actual commit atomically. The oracle cannot write to the claim store directly.
- Durable queue: pending adjudications survive engine restart. The engine persists the full conflict context; the oracle only needs to remember the handle.
- Idempotent submit: calling
submit_adjudicationwith the samehandle_idtwice is safe.
Related guides
Section titled “Related guides”- Python (Advanced) —
open_oracle/open_oracle_in_memoryconstructors - Rust (Advanced) —
open_with_oracle/OracleEnginetype alias - PostgreSQL Backend —
open_postgres_with_oracle - Concepts: Oracle Loop — the engine-side oracle loop design
- Concepts: Human-in-the-Loop — HITL adjudication flow