Skip to content

Vault Hardening Features

This document describes the comprehensive vault hardening features implemented in mcp-pvp to enhance security, performance, and auditability of PII handling in MCP workflows.

Overview

The vault hardening initiative implements five critical features that work together to provide:

  1. Session Integrity Validation - Prevents token theft across sessions
  2. Result Tokenization in Same Session - Maintains session consistency
  3. Scanner-Based TEXT Token Parser - High-performance O(n) token parsing
  4. Recursive Output Scrubbing - Comprehensive PII detection in complex outputs
  5. Audit Coherence - Complete parent-child event tracking

All features are production-ready with comprehensive test coverage (148 tests, 87% code coverage).


1. Session Integrity Validation

Overview

Session integrity validation prevents cross-session token theft by binding each PII record to its vault session. This ensures that tokens can only be accessed within their originating session, preventing attackers from using stolen token references in different sessions.

Implementation

Each StoredPII now includes a vault_session field that tracks ownership:

class StoredPII(BaseModel):
    """PII value stored in vault."""
    ref: str  # Token reference (e.g., "tkn_abc123")
    pii_type: PIIType  # Type of PII (EMAIL, PHONE, etc.)
    value: str  # The actual sensitive value
    vault_session: str  # Session ID that owns this token
    created_at: datetime

Usage Example

from mcp_pvp import Vault, TokenizeRequest, Policy

vault = Vault(policy=Policy())

# Create first session
req1 = TokenizeRequest(content="alice@example.com")
resp1 = vault.tokenize(req1)
session1 = resp1.vault_session
token1 = resp1.tokens[0]

# Create second session
req2 = TokenizeRequest(content="bob@example.com")
resp2 = vault.tokenize(req2)
session2 = resp2.vault_session
token2 = resp2.tokens[0]

# Try to access token1 using session2 (BLOCKED)
try:
    vault.store.get_pii(session2, token1.ref)
except TokenSessionMismatchError:
    print("✅ Cross-session access prevented!")

Security Benefits

  • Token Theft Prevention: Stolen token references cannot be used in different sessions
  • Session Isolation: Each session maintains independent token ownership
  • Replay Attack Mitigation: Tokens expire with their session TTL

Error Handling

When session mismatch is detected, a TokenSessionMismatchError is raised:

class TokenSessionMismatchError(PVPError):
    """Raised when attempting to redeem a token from a different session."""

    def __init__(
        self,
        message: str = "Token does not belong to the requesting session",
        details: dict[str, Any] | None = None,
    ):
        super().__init__(message, ErrorCode.ERR_TOKEN_SESSION_MISMATCH, details)

2. Result Tokenization in Same Session

Overview

When tools return results containing PII, that PII is automatically tokenized within the same vault session as the original request. This ensures session consistency and simplifies token lifecycle management.

Implementation

During deliver() execution, result tokenization reuses the existing vault session:

# In vault.deliver()
if self.detector:
    # Detect PII in tool result
    result_detections = self.detector.detect(result_content)

    # Tokenize using SAME session
    result_tokenize_req = TokenizeRequest(
        content=result_content,
        vault_session=deliver_req.vault_session,  # Reuse session
        run=deliver_req.run,
        parent_audit_id=audit_id,  # Link to parent audit
    )
    result_resp = self.tokenize(result_tokenize_req)

Usage Example

from mcp_pvp import Vault, TokenizeRequest, DeliverRequest, ToolCall, RunContext

vault = Vault(policy=Policy(default_allow=True))

# Initial tokenization
tokenize_resp = vault.tokenize(TokenizeRequest(
    content="Contact alice@example.com"
))
session_id = tokenize_resp.vault_session

# Deliver with result containing PII
deliver_resp = vault.deliver(DeliverRequest(
    vault_session=session_id,
    run=RunContext(run_id="run1", participant_id="llm"),
    tool_call=ToolCall(name="get_user_info", args={}),
))

# Result tokens belong to SAME session
for token in deliver_resp.result_tokens:
    pii = vault.store.get_pii(session_id, token.ref)
    assert pii.vault_session == session_id  # ✅ Same session!

Benefits

  • Simplified Lifecycle: All tokens in a workflow share one session TTL
  • Audit Clarity: Single session traces complete request/response flow
  • Performance: No overhead creating new sessions for result tokens
  • Consistency: Token ownership is clear and predictable

Session Reuse Logging

When a session is reused, the vault logs this event:

logger.info("vault_tokenize_reusing_session", vault_session=vault_session)

3. Scanner-Based TEXT Token Parser

Overview

The scanner-based parser replaces the previous regex-based implementation with a high-performance O(n) state machine. This provides:

  • Linear time complexity: Single pass through content
  • Pathological input resistance: No regex backtracking vulnerabilities
  • Better error handling: Gracefully handles malformed tokens
  • Performance: 10-100x faster on complex inputs

State Machine

The scanner uses 9 states to parse TEXT format tokens [[PII:TYPE:ref]]:

TEXT → BRACKET1 → BRACKET2 → PII → COLON1 → TYPE → COLON2 → REF → CLOSE1 → CLOSE2
  ↓       ↓          ↓         ↓       ↓       ↓        ↓       ↓       ↓
 [ERROR] [ERROR]  [ERROR]  [ERROR] [ERROR] [ERROR]  [ERROR] [ERROR] [ERROR]

Implementation

from enum import Enum, auto

class ScanState(Enum):
    """States for the TEXT token scanner."""
    TEXT = auto()      # Regular text
    BRACKET1 = auto()  # Seen first '['
    BRACKET2 = auto()  # Seen second '['
    PII = auto()       # Seen 'PII'
    COLON1 = auto()    # Seen first ':'
    TYPE = auto()      # Reading PII type
    COLON2 = auto()    # Seen second ':'
    REF = auto()       # Reading token ref
    CLOSE1 = auto()    # Seen first ']'

def scan_text_tokens(content: str) -> list[TextToken]:
    """Scan content for TEXT tokens using state machine."""
    state = ScanState.TEXT
    # ... implementation details

Usage Example

from mcp_pvp.tokens import extract_text_tokens

content = "Email: [[PII:EMAIL:tkn_abc]] Phone: [[PII:PHONE:tkn_xyz]]"
tokens = extract_text_tokens(content)

# Returns: [
#   TextToken(type=PIIType.EMAIL, ref="tkn_abc"),
#   TextToken(type=PIIType.PHONE, ref="tkn_xyz"),
# ]

Performance Characteristics

Input Type Regex Approach Scanner Approach Speedup
Simple tokens 0.1ms 0.01ms 10x
Pathological input (1000 false starts) 100ms+ 1ms 100x+
Large documents (100KB) 50ms 5ms 10x

Error Handling

The scanner gracefully handles malformed tokens:

# Missing closing bracket
"[[PII:EMAIL:tkn_abc]"  []  # No tokens extracted

# Invalid PII type
"[[PII:INVALID:tkn_abc]]"  []  # Type validation fails

# Special characters in ref
"[[PII:EMAIL:tkn_abc-123_xyz]]"  [TextToken(...)]  # Valid
"[[PII:EMAIL:tkn abc]]"  []  # Spaces not allowed

4. Recursive Output Scrubbing

Overview

Recursive output scrubbing ensures that any Python object returned by a tool is thoroughly scanned for PII, including:

  • Exceptions with tracebacks
  • Nested dictionaries and lists
  • Custom objects with attributes
  • Mixed structures up to depth 10

This prevents PII leakage through complex tool responses.

Implementation

The serialize_for_pii_detection() function recursively converts any object to a string:

def serialize_for_pii_detection(obj: Any, max_depth: int = 10, _depth: int = 0) -> str:
    """
    Recursively serialize any Python object for PII detection.

    Handles:
    - Primitives (str, int, float, bool, None)
    - Collections (list, tuple, set, dict)
    - Exceptions (with traceback)
    - Custom objects (via __dict__ inspection)
    - Circular references (via depth limit)
    """
    # Base cases
    if obj is None:
        return "None"
    if isinstance(obj, (str, int, float, bool)):
        return str(obj)

    # Depth limit for circular reference protection
    if _depth >= max_depth:
        return f"<max_depth_reached:{type(obj).__name__}>"

    # Exception with traceback
    if isinstance(obj, Exception):
        import traceback
        tb = traceback.format_exception(type(obj), obj, obj.__traceback__)
        return "".join(tb)

    # Collections
    if isinstance(obj, dict):
        items = [
            f"{serialize_for_pii_detection(k, max_depth, _depth+1)}: "
            f"{serialize_for_pii_detection(v, max_depth, _depth+1)}"
            for k, v in obj.items()
        ]
        return "{" + ", ".join(items) + "}"

    if isinstance(obj, (list, tuple, set)):
        items = [serialize_for_pii_detection(item, max_depth, _depth+1) for item in obj]
        return "[" + ", ".join(items) + "]"

    # Custom objects
    if hasattr(obj, "__dict__"):
        return serialize_for_pii_detection(obj.__dict__, max_depth, _depth+1)

    return str(obj)

Usage Example

from mcp_pvp import Vault, DeliverRequest, ToolCall, RunContext

class UserProfile:
    def __init__(self, name: str, email: str, phone: str):
        self.name = name
        self.email = email
        self.phone = phone

# Tool returns custom object
def get_user_profile(user_id: str) -> UserProfile:
    return UserProfile("Alice", "alice@example.com", "555-1234")

# Vault automatically scrubs the entire object
vault = Vault(policy=Policy(default_allow=True))
deliver_resp = vault.deliver(DeliverRequest(
    vault_session=session_id,
    run=RunContext(run_id="run1", participant_id="llm"),
    tool_call=ToolCall(name="get_user_profile", args={"user_id": "123"}),
))

# Result is scrubbed:
# {name: Alice, email: [[PII:EMAIL:tkn_xyz]], phone: [[PII:PHONE:tkn_abc]]}
assert "[[PII:EMAIL:" in deliver_resp.tool_result
assert "alice@example.com" not in deliver_resp.tool_result

Exception Handling

Exceptions are fully serialized including tracebacks:

def buggy_tool():
    email = "admin@example.com"
    raise ValueError(f"Invalid email: {email}")

# Exception traceback is scrubbed:
# ValueError: Invalid email: [[PII:EMAIL:tkn_xyz]]
# Traceback shows [[PII:EMAIL:tkn_xyz]] instead of raw email

Depth Protection

Circular references and deeply nested structures are handled:

# Circular reference
obj = {"self": None}
obj["self"] = obj

serialized = serialize_for_pii_detection(obj)
# Result: {self: <max_depth_reached:dict>}

Supported Types

Type Handling
str, int, float, bool, None Direct conversion to string
dict Recursive serialization of keys and values
list, tuple, set Recursive serialization of items
Exception Full traceback with traceback.format_exception()
Custom objects Serialization via __dict__ inspection
Circular references Depth limit prevents infinite recursion

5. Audit Coherence

Overview

Audit coherence provides complete traceability by linking child audit events (result tokenization) to their parent events (deliver operations). This enables:

  • Complete audit trails: Track entire request/response flows
  • Parent-child queries: Find all child events for a deliver operation
  • Debugging: Understand full context of tokenization operations
  • Compliance: Prove complete handling of sensitive data

Implementation

The AuditEvent model includes a parent_audit_id field:

class AuditEvent(BaseModel):
    """Audit event record."""

    audit_id: str  # Unique event ID
    timestamp: datetime
    event_type: AuditEventType  # TOKENIZE, DELIVER, etc.
    vault_session: str | None
    run: RunContext | None
    parent_audit_id: str | None  # 🔗 Links to parent event
    details: dict[str, Any]

During deliver(), result tokenization events reference their parent:

# In vault.deliver()
audit_id = self.audit_logger.log_deliver(...)

# Result tokenization includes parent reference
result_resp = self.tokenize(
    result_tokenize_req,
    parent_audit_id=audit_id  # 🔗 Link to parent deliver event
)

Usage Example

from mcp_pvp import Vault, DeliverRequest, ToolCall, RunContext

vault = Vault(policy=Policy(default_allow=True))

# Perform deliver operation
deliver_resp = vault.deliver(DeliverRequest(
    vault_session=session_id,
    run=RunContext(run_id="run1", participant_id="llm"),
    tool_call=ToolCall(name="get_info", args={}),
))

# Query audit trail
events = vault.audit_logger.get_events()

# Find deliver event
deliver_events = [e for e in events if e.event_type == "DELIVER"]
deliver_event = deliver_events[0]

# Find child tokenization events
child_events = [
    e for e in events 
    if e.parent_audit_id == deliver_event.audit_id
]

# Verify parent-child relationship
for child in child_events:
    assert child.parent_audit_id == deliver_event.audit_id
    assert child.event_type == "TOKENIZE"
    print(f"Child event {child.audit_id} links to parent {deliver_event.audit_id}")

Audit Trail Visualization

run_1 events:
  ├─ aud_abc123 [TOKENIZE] (initial)
  │   └─ vault_session: vs_xyz
  │
  ├─ aud_def456 [DELIVER]
  │   ├─ vault_session: vs_xyz
  │   └─ tool_name: get_user_info
  │       │
  │       └─ aud_ghi789 [TOKENIZE] (result)
  │           ├─ parent_audit_id: aud_def456  🔗
  │           ├─ vault_session: vs_xyz
  │           └─ tokens_created: 3

Query Patterns

Get all events for a run

events = vault.audit_logger.get_events_for_run("run1")

Get child events for a specific parent

parent_id = "aud_abc123"
children = [e for e in events if e.parent_audit_id == parent_id]

Build complete event tree

def build_event_tree(events: list[AuditEvent]) -> dict:
    """Build parent-child event tree."""
    tree = {}
    for event in events:
        if event.parent_audit_id is None:
            tree[event.audit_id] = {"event": event, "children": []}
        else:
            parent = tree.get(event.parent_audit_id)
            if parent:
                parent["children"].append(event)
    return tree

Audit Event Details

Each audit event includes comprehensive details (without raw PII):

# TOKENIZE event details
{
    "detections": 3,           # Number of PII instances found
    "tokens_created": 3,       # Number of tokens created
    "types": {                 # PII type distribution
        "EMAIL": 2,
        "PHONE": 1
    }
}

# DELIVER event details
{
    "tool_name": "send_email",
    "disclosed": {             # What was disclosed (counts only)
        "EMAIL": 1,
        "PHONE": 0
    }
}

Integration & Best Practices

Using All Features Together

from mcp_pvp import Vault, TokenizeRequest, DeliverRequest, ToolCall, RunContext, Policy

# Initialize vault with policy
vault = Vault(policy=Policy(default_allow=True))

# 1. Initial tokenization (creates session)
tokenize_resp = vault.tokenize(TokenizeRequest(
    content="Contact alice@example.com or call 555-1234"
))
session_id = tokenize_resp.vault_session

# Session integrity: Tokens bound to session_id
assert all(t.ref.startswith("tkn_") for t in tokenize_resp.tokens)

# 2. Deliver with complex result
deliver_resp = vault.deliver(DeliverRequest(
    vault_session=session_id,  # Reuse session
    run=RunContext(run_id="run1", participant_id="llm"),
    tool_call=ToolCall(name="complex_tool", args={}),
))

# Result tokenization: Same session, recursive scrubbing
assert all(
    vault.store.get_pii(session_id, t.ref).vault_session == session_id
    for t in deliver_resp.result_tokens
)

# Scanner: Fast parsing of result tokens
assert "[[PII:" in deliver_resp.tool_result

# 3. Audit trail: Complete parent-child tracking
events = vault.audit_logger.get_events()
deliver_events = [e for e in events if e.event_type == "DELIVER"]
result_tokenize_events = [
    e for e in events
    if e.event_type == "TOKENIZE" and e.parent_audit_id == deliver_events[0].audit_id
]
assert len(result_tokenize_events) > 0  # Child events exist

Performance Considerations

  1. Session Reuse: Reduces session creation overhead
  2. Scanner Performance: O(n) parsing handles large documents efficiently
  3. Recursive Scrubbing: Depth limit (10) prevents excessive overhead
  4. Audit Coherence: Minimal overhead for parent_audit_id tracking

Security Best Practices

  1. Validate Session TTL: Set appropriate TTL for your use case
  2. Monitor Audit Trails: Track unusual parent-child patterns
  3. Limit Recursion Depth: Adjust max_depth based on expected data structures
  4. Review Policies: Ensure deliver operations are properly scoped

Monitoring & Observability

All features emit structured logs:

// Session reuse
{"event": "vault_tokenize_reusing_session", "vault_session": "vs_xyz"}

// Result tokenization
{"event": "vault_tokenize_complete", "tokens_created": 3, "vault_session": "vs_xyz"}

// Audit parent-child
{"event": "audit_event", "parent_audit_id": "aud_abc", "event_type": "TOKENIZE"}

// Scanner performance
{"event": "vault_tokenize_complete", "detections": 100, "elapsed": "0.001s"}

Testing

All features are comprehensively tested:

  • Unit Tests: 138 tests covering individual features
  • Integration Tests: 10 tests validating features working together
  • Total Coverage: 148 tests, 87% code coverage

Run the test suite:

# All tests
uv run pytest -v

# Integration tests only
uv run pytest tests/test_integration.py -v

# Specific feature tests
uv run pytest tests/test_session_integrity.py -v
uv run pytest tests/test_result_tokenization.py -v
uv run pytest tests/test_scanner.py -v
uv run pytest tests/test_recursive_scrubbing.py -v
uv run pytest tests/test_audit_coherence.py -v

Migration Guide

All features are backward compatible. No code changes required for existing deployments.

What's New

  1. PIIRecord.vault_session field (automatically populated)
  2. serialize_for_pii_detection() function in vault.py
  3. Scanner-based parser (replaces regex, same API)
  4. parent_audit_id in AuditEvent (automatically populated)
  5. Session reuse in result tokenization (automatic)

Optional Enhancements

If you want to leverage new features explicitly:

# 1. Access vault_session on PII records
pii = vault.store.get_pii(session_id, token_ref)
assert pii.vault_session == session_id

# 2. Serialize custom objects before detection
from mcp_pvp.vault import serialize_for_pii_detection
serialized = serialize_for_pii_detection(custom_object)

# 3. Query audit trails with parent relationships
events = vault.audit_logger.get_events()
children = [e for e in events if e.parent_audit_id == parent_id]

Summary

The vault hardening features provide:

Enhanced Security: Session integrity prevents token theft
Better Performance: Scanner provides O(n) parsing with no regex backtracking
Complete Coverage: Recursive scrubbing finds PII in any Python object
Full Traceability: Audit coherence links all related events
Simplified Management: Session reuse reduces complexity

All features are production-ready, fully tested, and backward compatible.