Skip to content

Writing an Oracle

An oracle is the component that resolves conflicts between competing claims when the engine cannot resolve them deterministically. This is mempill’s primary differentiator: conflict resolution is pluggable, auditable, and human-overridable — not hidden inside a statistical model.

For the conceptual foundation see Concepts: Oracle Loop and Concepts: Human-in-the-Loop.

The Python oracle is duck-typed: any Python object with a single method qualifies. No base class or import from mempill is required:

def request_adjudication(self, agent_id: str, request: dict) -> str:
...
Parameter Type Description
agent_id str The agent whose claim triggered the conflict
request dict Conflict context (subject, predicate, incumbent, challenger values)
Return str A handle_id UUID string — the engine stores it in the durable pending queue

The oracle returns a handle_id immediately. It does not decide the verdict at call time. The verdict arrives later via engine.submit_adjudication(...). The engine owns the durable queue; the oracle only generates the handle.

open_oracle_in_memory / open_oracle
import mempill
class MyOracle:
def request_adjudication(self, agent_id: str, request: dict) -> str:
import uuid
# Log the request, push to a queue, etc.
# Return a fresh handle_id — the engine records the rest.
return str(uuid.uuid4())
oracle = MyOracle()
# Ephemeral in-memory engine with oracle:
engine = mempill.open_oracle_in_memory(oracle)
# Or file-backed:
engine = mempill.open_oracle("/path/to/agent.db", oracle)

These constructors (from mempill/__init__.py) return OracleEngine (PyOracleEngine), which exposes all the regular engine methods plus the three oracle-specific methods: list_pending_adjudications, submit_adjudication, and sweep_expired_adjudications.

  1. Agent ingests a claim that conflicts with an existing committed claim.
  2. The AdjudicationGate (C7) detects the conflict and calls oracle.request_adjudication(agent_id, request).
  3. The oracle returns a handle_id.
  4. The engine writes a PendingAdjudicationRow to the durable pending store (survives restart).
  5. The ingest response returns disposition = "QueuedForAdjudication".
  6. query_memory returns status = "Contested" with both values in alternatives.
  7. A later call to submit_adjudication resolves the conflict.
pending = engine.list_pending_adjudications(agent_id="my-agent")
for item in pending:
print(item["handle_id"]) # UUID string
print(item["subject"]) # e.g. "acme:ceo"
print(item["predicate"]) # e.g. "held_by"
print(item["incumbent_value"]) # the existing committed value
print(item["challenger_value"]) # the new conflicting value
print(item["queued_at"]) # ISO-8601 UTC timestamp
result = engine.submit_adjudication({
"handle_id": "45c186e5-...", # UUID from list_pending_adjudications
"verdict": "Affirm", # "Affirm" | "Deny" | "Unknown"
"evidence_provenance": mempill.ProvenanceLabel.external_first_hand(),
})
print(result["disposition"]) # e.g. "CommittedCheap" or "Superseded"
print(result["claim_ref"]) # UUID of the winning claim
Verdict Engine action
"Affirm" Challenger wins — incumbent is superseded; challenger is committed
"Deny" Incumbent stands — challenger is parked as PendingConflict
"Unknown" Cannot decide — claim stays Contested; removed from queue

The verdict is committed atomically with External(ExternalFirstHand) provenance — the submit call is itself an authoritative, auditable event in the ledger.

Pending adjudications have a TTL. Expired ones return to Contested automatically, but you can trigger a sweep explicitly (e.g. on startup or via a scheduled job):

swept = engine.sweep_expired_adjudications()
# Returns int (count swept) or dict with "swept" key
print(f"Swept {swept} expired adjudication(s) back to Contested")

The following was run against the live engine (demo venv, mempill 0.2.0):

ingest Alice: CommittedCheap ref: 58525121
ingest Bob: QueuedForAdjudication ref: 383a9987
pending count: 1
handle: 45c186e5 incumbent: Alice challenger: Bob
belief before resolve: status= Contested
submit_adjudication: {'handle_id': '45c186e5-...', 'disposition': 'CommittedCheap', 'claim_ref': '383a9987-...'}
belief after resolve: status= Resolved primary value: Bob

The demo implements the canonical human-in-the-loop oracle in mempill-demo/src/mempill_demo/adapters/human_oracle.py:

human_oracle.py (from the demo)
import uuid
class HumanOracle:
"""
Stateless oracle that queues every adjudication request for human review.
The engine is the durable store — this class only generates a handle_id.
"""
def request_adjudication(self, agent_id: str, request: dict) -> str:
"""Return a fresh UUID handle_id string; the engine records the rest."""
return str(uuid.uuid4())

The oracle itself is intentionally stateless. The entire conflict state (subject, predicate, incumbent value, challenger value, queued_at timestamp) is stored by the engine in the durable pending queue. The oracle does not need to remember anything between calls.

The demo then wires it:

memory_mempill.py (from the demo)
import mempill
from mempill_demo.adapters.human_oracle import HumanOracle
oracle = HumanOracle()
engine = mempill.open_oracle_in_memory(oracle)
# or: engine = mempill.open_oracle(db_path, oracle)

And provides a /review REPL command that calls list_pending_adjudications and submit_adjudication. The review flow (app/review.py) maps user choices to verdicts:

User input Verdict Meaning
c / challenger Affirm Challenger wins
i / incumbent Deny Incumbent stands
a / abstain Unknown Stays Contested; removed from queue
s / skip (nothing submitted) Defer — ask again next /review

An oracle that consults an authoritative external source automatically (no human required):

api_oracle.py (sketch)
import uuid
import httpx
class ApiOracle:
"""Oracle that resolves CEO claims against a corporate registry API."""
def __init__(self, registry_url: str, api_key: str) -> None:
self._url = registry_url
self._api_key = api_key
# Store pending requests in a local dict for fast lookup on submit.
# In production use a durable store; this is a sketch.
self._pending: dict[str, dict] = {}
def request_adjudication(self, agent_id: str, request: dict) -> str:
handle_id = str(uuid.uuid4())
# Kick off a non-blocking lookup; the engine calls this synchronously.
# In production, enqueue to a worker thread or async task.
self._pending[handle_id] = request
return handle_id
def resolve_pending(self, engine, agent_id: str) -> None:
"""Call this from a background worker to process pending requests."""
for handle_id, request in list(self._pending.items()):
verdict = self._consult_registry(request)
engine.submit_adjudication({
"handle_id": handle_id,
"verdict": verdict,
"evidence_provenance": {"type": "External", "kind": "ExternalFirstHand"},
})
del self._pending[handle_id]
def _consult_registry(self, request: dict) -> str:
"""Return "Affirm", "Deny", or "Unknown" based on registry lookup."""
challenger = request.get("challenger_value")
resp = httpx.get(
f"{self._url}/verify",
params={"value": challenger},
headers={"Authorization": f"Bearer {self._api_key}"},
timeout=5.0,
)
if resp.status_code == 200 and resp.json().get("verified"):
return "Affirm"
return "Unknown"

This sketch illustrates the pattern: request_adjudication is fast and synchronous (just returns a handle); the actual API call happens asynchronously and calls submit_adjudication when it completes.

For Rust callers, the oracle is OraclePort from mempill-core/src/ports/:

pub trait OraclePort: Send + Sync + 'static {
fn request_adjudication(
&self,
agent_id: &AgentId,
request: &AdjudicationRequest,
) -> Result<HandleId, OracleError>;
}

Use open_with_oracle(path, Arc::new(my_oracle))? (SQLite) or open_postgres_with_oracle(conn_str, Arc::new(my_oracle), ...) (Postgres).

  • I5: The oracle is a proposal generator, not a committer. It returns a handle; the engine performs the actual commit atomically. The oracle cannot write to the claim store directly.
  • Durable queue: pending adjudications survive engine restart. The engine persists the full conflict context; the oracle only needs to remember the handle.
  • Idempotent submit: calling submit_adjudication with the same handle_id twice is safe.