ppxai Architecture Refactoring Plan¶
Overview¶
This document outlines the comprehensive plan to refactor ppxai into a clean, modular architecture with clear separation between:
- Engine - Core business logic (providers, tools, models, sessions)
- Clients - Frontend interfaces (TUI, VSCode Extension, Web UI)
- Configuration - Provider and tool definitions
Design Principles¶
1. Tool Independence¶
- Tools can be enhanced or updated independently from providers
- Tools are registered in a central registry
- Provider-specific tools can be conditionally loaded
- Some tools may not make sense for all providers (e.g., Perplexity has built-in search)
2. Provider Separation¶
- Each provider (Perplexity, OpenAI, Gemini, OpenRouter, Local) is isolated
- Providers loaded dynamically based on user selection
- Provider-specific capabilities are declared in configuration
- Provider can define which tools to enable/disable
3. Model Management¶
- Models listed after provider selection
- Model list comes from configuration or provider's API (if available)
- Optional: Refresh model list from provider's listing API
4. Client Uniformity¶
- All clients (TUI, VSCode, Web) use the same engine API
- Same functionality available everywhere
- Clients receive events/data, not formatted output
Proposed Architecture¶
ppxai/
├── engine/ # Core engine (no UI dependencies)
│ ├── __init__.py # Public API exports
│ ├── types.py # Shared types/dataclasses
│ ├── config.py # Configuration loader
│ ├── providers/ # Provider implementations
│ │ ├── __init__.py # Provider registry & factory
│ │ ├── base.py # BaseProvider abstract class
│ │ ├── perplexity.py # Perplexity provider
│ │ ├── openai_provider.py # OpenAI/ChatGPT provider
│ │ ├── openrouter.py # OpenRouter provider
│ │ └── local.py # Local models (Ollama, vLLM)
│ ├── tools/ # Tool system
│ │ ├── __init__.py # Tool registry
│ │ ├── base.py # BaseTool abstract class
│ │ ├── manager.py # Tool manager (load/execute)
│ │ ├── builtin/ # Built-in tools
│ │ │ ├── __init__.py
│ │ │ ├── filesystem.py # read_file, search_files, list_directory
│ │ │ ├── shell.py # execute_shell_command
│ │ │ ├── calculator.py # calculator
│ │ │ ├── datetime_tool.py # get_datetime
│ │ │ └── web.py # web_search, fetch_url, get_weather
│ │ └── mcp/ # MCP tool integration (optional)
│ │ └── __init__.py
│ ├── session.py # Session management
│ ├── prompts.py # System prompts & templates
│ └── client.py # Main engine client (facade)
│
├── server/ # Server interfaces
│ ├── __init__.py
│ ├── jsonrpc.py # JSON-RPC over stdio
│ ├── http.py # HTTP/REST server (optional)
│ └── websocket.py # WebSocket server (optional)
│
├── tui/ # Terminal UI client
│ ├── __init__.py
│ ├── main.py # TUI entry point
│ ├── ui.py # Rich console components
│ └── commands.py # Slash command handlers
│
├── ppxai.py # CLI entry point
└── __main__.py # Module entry point
Component Details¶
Engine Layer (engine/)¶
engine/types.py - Shared Types¶
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
from enum import Enum
class EventType(Enum):
STREAM_START = "stream_start"
STREAM_CHUNK = "stream_chunk"
STREAM_END = "stream_end"
TOOL_CALL = "tool_call"
TOOL_RESULT = "tool_result"
ERROR = "error"
@dataclass
class Message:
role: str # 'user', 'assistant', 'system'
content: str
@dataclass
class UsageStats:
prompt_tokens: int = 0
completion_tokens: int = 0
total_tokens: int = 0
estimated_cost: float = 0.0
@dataclass
class ChatResponse:
content: str
citations: List[str] = field(default_factory=list)
usage: Optional[UsageStats] = None
@dataclass
class Event:
type: EventType
data: Any = None
@dataclass
class ProviderCapabilities:
web_search: bool = False
web_fetch: bool = False
weather: bool = False
citations: bool = False
streaming: bool = True
@dataclass
class ToolDefinition:
name: str
description: str
parameters: Dict[str, Any]
handler: callable
provider_specific: Optional[List[str]] = None # If set, only for these providers
engine/providers/base.py - Base Provider¶
from abc import ABC, abstractmethod
from typing import List, Dict, AsyncIterator, Optional
from ..types import Message, ChatResponse, Event, ProviderCapabilities
class BaseProvider(ABC):
"""Abstract base class for all AI providers."""
name: str
capabilities: ProviderCapabilities
@abstractmethod
def __init__(self, api_key: str, base_url: str, **kwargs):
pass
@abstractmethod
async def chat(
self,
messages: List[Message],
model: str,
stream: bool = False
) -> AsyncIterator[Event]:
"""Send chat request, yielding events."""
pass
@abstractmethod
def list_models(self) -> List[Dict[str, str]]:
"""Return available models for this provider."""
pass
@abstractmethod
def validate_config(self) -> bool:
"""Validate provider configuration."""
pass
def needs_tool(self, tool_category: str) -> bool:
"""Check if provider needs a tool (doesn't have native capability)."""
return not getattr(self.capabilities, tool_category, False)
engine/providers/__init__.py - Provider Registry¶
from typing import Dict, Type, Optional
from .base import BaseProvider
_providers: Dict[str, Type[BaseProvider]] = {}
def register_provider(name: str, provider_class: Type[BaseProvider]):
"""Register a provider implementation."""
_providers[name] = provider_class
def get_provider(name: str, **kwargs) -> Optional[BaseProvider]:
"""Get an instance of a provider by name."""
if name not in _providers:
return None
return _providers[name](**kwargs)
def list_providers() -> List[str]:
"""List all registered provider names."""
return list(_providers.keys())
# Auto-register built-in providers
from .perplexity import PerplexityProvider
from .openai_provider import OpenAIProvider
from .openrouter import OpenRouterProvider
from .local import LocalProvider
register_provider("perplexity", PerplexityProvider)
register_provider("openai", OpenAIProvider)
register_provider("openrouter", OpenRouterProvider)
register_provider("local", LocalProvider)
engine/tools/base.py - Base Tool¶
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional, List
class BaseTool(ABC):
"""Abstract base class for all tools."""
name: str
description: str
parameters: Dict[str, Any]
provider_specific: Optional[List[str]] = None # Only for these providers (None = all)
@abstractmethod
async def execute(self, **kwargs) -> str:
"""Execute the tool and return result."""
pass
def is_available_for(self, provider: str) -> bool:
"""Check if tool is available for a specific provider."""
if self.provider_specific is None:
return True
return provider in self.provider_specific
engine/tools/manager.py - Tool Manager¶
from typing import Dict, List, Optional, Any
from .base import BaseTool
class ToolManager:
"""Manages tool registration and execution."""
def __init__(self):
self._tools: Dict[str, BaseTool] = {}
self._provider: Optional[str] = None
def register_tool(self, tool: BaseTool):
"""Register a tool."""
self._tools[tool.name] = tool
def set_provider(self, provider: str):
"""Set current provider (filters available tools)."""
self._provider = provider
def get_available_tools(self) -> List[BaseTool]:
"""Get tools available for current provider."""
if self._provider is None:
return list(self._tools.values())
return [t for t in self._tools.values() if t.is_available_for(self._provider)]
def get_tool(self, name: str) -> Optional[BaseTool]:
"""Get a specific tool by name."""
tool = self._tools.get(name)
if tool and tool.is_available_for(self._provider):
return tool
return None
async def execute_tool(self, name: str, **kwargs) -> str:
"""Execute a tool by name."""
tool = self.get_tool(name)
if not tool:
raise ValueError(f"Tool not found or not available: {name}")
return await tool.execute(**kwargs)
def get_tools_prompt(self) -> str:
"""Generate system prompt describing available tools."""
# ... generates prompt for current provider's available tools
engine/client.py - Main Engine Client (Facade)¶
from typing import List, AsyncIterator, Optional, Dict, Any
from .types import Message, Event, EventType, ChatResponse
from .providers import get_provider
from .tools.manager import ToolManager
from .session import SessionManager
from .config import load_config
class EngineClient:
"""Main engine client - the facade for all engine functionality.
This is the primary interface for all frontends (TUI, VSCode, Web).
"""
def __init__(self):
self.provider = None
self.provider_name: str = ""
self.model: str = ""
self.tool_manager = ToolManager()
self.session_manager = SessionManager()
self.tools_enabled: bool = False
self.config = load_config()
# === Provider Management ===
def set_provider(self, provider_name: str) -> bool:
"""Switch to a different provider."""
config = self.config.get_provider(provider_name)
if not config:
return False
self.provider = get_provider(
provider_name,
api_key=config['api_key'],
base_url=config['base_url']
)
self.provider_name = provider_name
self.tool_manager.set_provider(provider_name)
return True
def list_providers(self) -> List[Dict[str, Any]]:
"""List available providers with their status."""
return self.config.list_providers()
# === Model Management ===
def set_model(self, model_id: str) -> bool:
"""Set the current model."""
models = self.list_models()
if any(m['id'] == model_id for m in models):
self.model = model_id
return True
return False
def list_models(self) -> List[Dict[str, str]]:
"""List available models for current provider."""
if not self.provider:
return []
return self.provider.list_models()
async def refresh_models(self) -> List[Dict[str, str]]:
"""Refresh model list from provider API (if supported)."""
if hasattr(self.provider, 'fetch_models'):
return await self.provider.fetch_models()
return self.list_models()
# === Tool Management ===
def enable_tools(self) -> bool:
"""Enable tool support."""
self.tools_enabled = True
return True
def disable_tools(self) -> bool:
"""Disable tool support."""
self.tools_enabled = False
return True
def list_tools(self) -> List[Dict[str, Any]]:
"""List available tools for current provider."""
return [
{"name": t.name, "description": t.description}
for t in self.tool_manager.get_available_tools()
]
def set_tool_config(self, setting: str, value: Any) -> bool:
"""Configure tool settings."""
if setting == "max_iterations":
self.tool_manager.max_iterations = int(value)
return True
return False
# === Chat ===
async def chat(
self,
message: str,
stream: bool = True
) -> AsyncIterator[Event]:
"""Send a chat message, yielding events.
Events include:
- STREAM_START: Chat started
- STREAM_CHUNK: Partial response (for streaming)
- TOOL_CALL: Tool being called
- TOOL_RESULT: Tool result
- STREAM_END: Final response
- ERROR: Error occurred
"""
if not self.provider:
yield Event(EventType.ERROR, "No provider configured")
return
# Add message to history
self.session_manager.add_message(Message("user", message))
if self.tools_enabled:
async for event in self._chat_with_tools(message, stream):
yield event
else:
async for event in self._chat_simple(message, stream):
yield event
async def _chat_simple(self, message: str, stream: bool) -> AsyncIterator[Event]:
"""Simple chat without tools."""
messages = self.session_manager.get_messages()
async for event in self.provider.chat(messages, self.model, stream):
yield event
if event.type == EventType.STREAM_END:
self.session_manager.add_message(Message("assistant", event.data))
async def _chat_with_tools(self, message: str, stream: bool) -> AsyncIterator[Event]:
"""Chat with tool support."""
# Build messages with tool prompt
messages = self.session_manager.get_messages()
tool_prompt = self.tool_manager.get_tools_prompt()
# ... tool calling loop with max_iterations
# === Coding Tasks ===
async def coding_task(
self,
task_type: str,
content: str,
language: Optional[str] = None,
filename: Optional[str] = None
) -> AsyncIterator[Event]:
"""Execute a coding task (explain, test, docs, debug, implement)."""
# ... similar to chat but with coding prompts
# === Session Management ===
def save_session(self, name: Optional[str] = None) -> str:
"""Save current session, return session name."""
return self.session_manager.save(name)
def load_session(self, name: str) -> bool:
"""Load a saved session."""
return self.session_manager.load(name)
def list_sessions(self) -> List[Dict[str, Any]]:
"""List saved sessions."""
return self.session_manager.list_sessions()
def clear_history(self):
"""Clear conversation history."""
self.session_manager.clear()
def get_usage(self) -> Dict[str, Any]:
"""Get usage statistics."""
return self.session_manager.get_usage()
# === Status ===
def get_status(self) -> Dict[str, Any]:
"""Get current engine status."""
return {
"provider": self.provider_name,
"model": self.model,
"tools_enabled": self.tools_enabled,
"tool_count": len(self.tool_manager.get_available_tools()),
"has_api_key": self.provider is not None
}
Server Layer (server/)¶
The server layer provides multiple transport options for client communication:
Transport Options¶
| Transport | File | Use Case | Status |
|---|---|---|---|
| JSON-RPC/stdio | jsonrpc.py |
Legacy, subprocess | ✅ Implemented |
| HTTP + SSE | http.py |
Recommended, low latency | 🚧 Planned |
| WebSocket | websocket.py |
Bidirectional, real-time | 📋 Optional |
server/http.py - HTTP Server with SSE (Recommended)¶
Why SSE over JSON-RPC? See docs/archive/sse-migration-plan.md for detailed analysis.
| Metric | JSON-RPC/stdio | HTTP + SSE |
|---|---|---|
| First token latency | 50-200ms | 10-30ms |
| Per-request overhead | JSON parse + asyncio.run | Native async |
| Reconnection | Manual | Built-in |
| Cancellation | Kill process | AbortController |
| Debug tooling | Custom | Browser DevTools |
"""HTTP server with SSE streaming for ppxai engine."""
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import json
from ..engine import EngineClient, EventType
app = FastAPI()
@app.post("/chat")
async def chat(request: Request):
"""Chat endpoint with Server-Sent Events streaming."""
body = await request.json()
message = body.get("message", "")
async def generate():
async for event in request.app.state.engine.chat(message, stream=True):
data = {"type": event.type.value, "data": event.data}
yield f"data: {json.dumps(data)}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
# REST endpoints for configuration
@app.get("/providers")
async def get_providers(request: Request):
return [{"id": p.id, "name": p.name, "has_api_key": p.has_api_key}
for p in request.app.state.engine.list_providers()]
@app.post("/provider")
async def set_provider(request: Request):
body = await request.json()
return {"success": request.app.state.engine.set_provider(body["provider"])}
@app.get("/status")
async def get_status(request: Request):
return request.app.state.engine.get_status()
server/jsonrpc.py - JSON-RPC Server (Legacy)¶
Retained for backward compatibility with existing integrations.
"""JSON-RPC 2.0 server over stdio.
Provides the same EngineClient functionality over JSON-RPC,
enabling integration with VSCode extension and other clients.
NOTE: For new integrations, prefer HTTP + SSE (server/http.py)
for better performance. See docs/sse-migration-plan.md.
"""
import json
import sys
import asyncio
from typing import Dict, Any
from ..engine.client import EngineClient
from ..engine.types import EventType
class JsonRpcServer:
def __init__(self):
self.engine = EngineClient()
self.methods = {
# Provider
"set_provider": self._set_provider,
"list_providers": self._list_providers,
# Model
"set_model": self._set_model,
"list_models": self._list_models,
"refresh_models": self._refresh_models,
# Tools
"enable_tools": self._enable_tools,
"disable_tools": self._disable_tools,
"list_tools": self._list_tools,
"set_tool_config": self._set_tool_config,
# Chat
"chat": self._chat,
"coding_task": self._coding_task,
# Session
"save_session": self._save_session,
"load_session": self._load_session,
"list_sessions": self._list_sessions,
"clear_history": self._clear_history,
"get_usage": self._get_usage,
# Status
"get_status": self._get_status,
}
async def _chat(self, message: str, stream: bool = True) -> Dict[str, Any]:
"""Handle chat request, sending stream events if requested."""
events = []
async for event in self.engine.chat(message, stream):
if stream:
self._send_stream_event(event)
events.append(event)
# Return final response
final = next((e for e in reversed(events) if e.type == EventType.STREAM_END), None)
return {"content": final.data if final else ""}
def _send_stream_event(self, event):
"""Send streaming event to stdout."""
print(json.dumps({
"type": "stream",
"event": event.type.value,
"data": event.data
}), flush=True)
def run(self):
"""Main server loop."""
print(json.dumps({"type": "ready"}), flush=True)
for line in sys.stdin:
request = json.loads(line.strip())
response = asyncio.run(self.handle_request(request))
print(json.dumps(response), flush=True)
TUI Layer (tui/)¶
The TUI consumes events from EngineClient and renders them using Rich:
# tui/main.py
from ..engine.client import EngineClient
from ..engine.types import EventType
from .ui import render_chunk, render_tool_call, render_error
async def main():
engine = EngineClient()
# Provider selection
providers = engine.list_providers()
selected = select_provider(providers)
engine.set_provider(selected)
# Model selection
models = engine.list_models()
selected = select_model(models)
engine.set_model(selected)
# Main loop
while True:
user_input = prompt("You: ")
async for event in engine.chat(user_input):
if event.type == EventType.STREAM_CHUNK:
render_chunk(event.data)
elif event.type == EventType.TOOL_CALL:
render_tool_call(event.data)
elif event.type == EventType.TOOL_RESULT:
render_tool_result(event.data)
elif event.type == EventType.ERROR:
render_error(event.data)
elif event.type == EventType.STREAM_END:
render_complete(event.data)
Configuration System¶
ppxai-config.json Structure¶
{
"providers": {
"perplexity": {
"name": "Perplexity AI",
"base_url": "https://api.perplexity.ai",
"api_key_env": "PERPLEXITY_API_KEY",
"capabilities": {
"web_search": true,
"web_fetch": true,
"citations": true
},
"default_model": "sonar-pro",
"coding_model": "sonar-pro",
"models": {
"sonar-pro": {"name": "Sonar Pro", "description": "Best for complex queries"},
"sonar": {"name": "Sonar", "description": "Fast general-purpose"}
},
"tools": {
"disabled": ["web_search", "fetch_url"]
}
},
"openai": {
"name": "OpenAI",
"base_url": "https://api.openai.com/v1",
"api_key_env": "OPENAI_API_KEY",
"capabilities": {
"web_search": false,
"web_fetch": false
},
"default_model": "gpt-4o",
"models": {
"gpt-4o": {"name": "GPT-4o", "description": "Most capable"},
"gpt-4o-mini": {"name": "GPT-4o Mini", "description": "Fast and efficient"}
},
"tools": {
"enabled": ["all"]
}
},
"custom": {
"name": "Custom Provider",
"base_url": "${CUSTOM_BASE_URL}",
"api_key_env": "CUSTOM_API_KEY",
"capabilities": {},
"tools": {
"enabled": ["all"],
"additional": ["custom_tool_x"]
}
}
},
"tools": {
"global_max_iterations": 15,
"custom_tools_dir": "~/.ppxai/tools/"
}
}
Implementation Phases¶
Phase 1: Create Engine Types & Structure ✅¶
- Create
engine/types.pywith all shared types - Create
engine/providers/base.pyabstract class - Create
engine/tools/base.pyabstract class - Create directory structure
Phase 2: Refactor Providers ✅¶
- Extract
PerplexityProviderfrom current client - Extract
OpenAIProvider(generic OpenAI-compatible) - Create provider registry
Phase 3: Refactor Tools ✅¶
- Move tool definitions to
engine/tools/builtin/ - Create
ToolManagerwith provider-aware filtering - Remove Rich console dependencies from tool execution
Phase 4: Create Engine Client ✅¶
- Implement
EngineClientfacade - Event-based output (no console printing)
- Async iterators for streaming
Phase 5: Update Server ✅¶
- Refactor
server.pyto useEngineClient - All methods delegate to engine
- Stream events via JSON
Phase 6: Update TUI ✅¶
- Refactor TUI to consume events from
EngineClient - All Rich rendering in TUI layer only
Phase 7: Update VSCode Extension ✅¶
- Connect to JSON-RPC server
- Remove duplicate TypeScript AI client
- Use Python backend for all functionality
Phase 8: HTTP + SSE Migration 🚧¶
Goal: Replace JSON-RPC/stdio with HTTP + SSE for improved streaming performance.
See docs/archive/sse-migration-plan.md for detailed implementation plan.
Summary:
1. Add FastAPI + uvicorn dependencies
2. Create ppxai/server/http.py with SSE streaming
3. Create vscode-extension/src/backend-http.ts with fetch/SSE client
4. Add server lifecycle management to extension
5. Implement automatic fallback to JSON-RPC
6. Benchmark and validate performance improvements
Expected Improvements: - First token latency: 50-200ms → 10-30ms - Native request cancellation via AbortController - Built-in SSE reconnection - Standard HTTP debugging tools
Benefits¶
- Single Source of Truth: All AI logic in engine layer
- Tool Independence: Tools updated without touching providers
- Provider Flexibility: Easy to add new providers
- Client Uniformity: TUI, VSCode, Web all use same API
- Testability: Engine layer can be tested without UI
- Maintainability: Clear separation of concerns
Migration Path¶
- Keep existing code working during refactoring
- Create new
engine/alongside existingppxai/ - Gradually move functionality to engine
- Update clients one at a time
- Remove old code once migration complete
Related Files¶
- Current implementation:
ppxai/client.py,perplexity_tools_prompt_based.py - Current config:
ppxai/config.py - Current server:
ppxai/server.py - VSCode Extension:
vscode-extension/
Architecture Validation (v1.13.3)¶
The layered architecture proved its value during v1.13.3 development. A fundamental change to how Gemini handles system messages required zero changes to any UI code.
The Problem¶
Gemini's API differs from OpenAI-compatible APIs:
- OpenAI: System messages go in the messages array
- Gemini: System messages must be passed via system_instruction config parameter
When tools were enabled with Gemini, the tool prompt (in a system message) was silently dropped, breaking tool functionality.
The Fix¶
Changes were isolated to the provider layer:
| File | Change | Lines Modified |
|---|---|---|
ppxai/engine/providers/gemini.py |
Extract system messages, pass via system_instruction |
~30 |
ppxai/engine/client.py |
Add native web search guidance to tool prompt | ~15 |
ppxai/config.py |
Handle UTF-8 BOM in config files | ~5 |
What Didn't Need Changes¶
| Component | Files | Reason |
|---|---|---|
| VSCode Extension | 0 | Consumes events, doesn't know provider internals |
| TUI | 0 | Uses same event handlers |
| Web App | 0 | Same HTTP/SSE contract |
| HTTP Server | 0 | Passes events through unchanged |
The Firewall Pattern¶
┌─────────────────────────────────────────────────────────┐
│ UIs (VSCode Extension, TUI, Web App) │
│ - Consume events, render responses │
│ - No knowledge of provider internals │
└─────────────────────────────────────────────────────────┘
▲
│ Events (STREAM_CHUNK, TOOL_ERROR, etc.)
│
┌─────────────────────────────────────────────────────────┐
│ Engine Client (client.py) │
│ - Orchestrates providers, tools, sessions │
│ - Emits uniform events │
└─────────────────────────────────────────────────────────┘
▲
│
┌─────────────────────────────────────────────────────────┐
│ Providers (gemini.py, perplexity.py, openai_compat.py) │
│ - API-specific logic stays here │
│ - Provider quirks isolated │
└─────────────────────────────────────────────────────────┘
The event-based contract (EventType.STREAM_CHUNK, TOOL_ERROR, etc.) acts as the firewall. Provider internals can change dramatically—Gemini's system_instruction vs OpenAI's system messages—but consumers see the same event stream.
Key Insight¶
Provider-specific quirks stay contained. The v1.7.0 refactoring that extracted the engine layer created this isolation, preventing "spaghetti effects" where one change cascades through the entire codebase.