Turn-based LLM Agent Orchestration #156

Open
opened 2025-12-01 16:00:34 +00:00 by john · 2 comments
Owner

Turn-based LLM Agent Orchestration

Infrastructure for managing multiple LLM agents in a shared McRogueFace environment.

Overview

This issue covers the orchestration layer that:

  1. Switches grid perspective between agents
  2. Captures screenshots for VLM input
  3. Sends prompts to LLM APIs and parses responses
  4. Executes game actions based on LLM outputs
  5. Manages the speech subsystem for agent-to-agent communication
  6. Records simulation logs for deterministic replay

Core Components

1. Turn Orchestrator

class TurnOrchestrator:
    def __init__(self, grid, world_graph, agents: List[LLMAgent]):
        self.grid = grid
        self.world = world_graph
        self.agents = agents
        self.turn_number = 0
        self.simulation_log = []
    
    def run_turn(self):
        """Execute one full turn (all agents act once)."""
        self.turn_number += 1
        
        for agent in self.agents:
            step = self.run_agent_turn(agent)
            self.simulation_log.append(step)
        
        # After all agents: run NPC behaviors
        self.run_npc_behaviors()
    
    def run_agent_turn(self, agent) -> SimulationStep:
        # 1. Switch perspective to this agent
        self.prepare_agent_view(agent)
        
        # 2. Render frame and capture screenshot
        self.render_frame()
        screenshot_path = self.capture_screenshot(agent)
        
        # 3. Build context
        context = self.world.prepare_agent_context(agent, self.grid)
        context["image"] = screenshot_path
        
        # 4. Query LLM
        llm_response = agent.query_llm(context)
        
        # 5. Parse and execute action
        action = self.parse_action(llm_response)
        result = self.execute_action(agent, action)
        
        # 6. Record step
        return SimulationStep(
            turn=self.turn_number,
            agent_id=agent.id,
            perception=context,
            llm_response=llm_response,
            parsed_action=action,
            result=result
        )

2. Perspective Switching

def prepare_agent_view(self, agent):
    """Configure grid to show this agent's perspective."""
    
    # Center grid on agent
    ex, ey = agent.entity.grid_pos
    cell_w, cell_h = self.grid.cell_size
    self.grid.center = (ex * cell_w + cell_w/2, ey * cell_h + cell_h/2)
    
    # Compute FOV from agent position
    from mcrfpy import libtcod
    visible_cells = libtcod.compute_fov(
        self.grid, ex, ey, 
        agent.sight_radius,
        light_walls=True,
        algorithm=libtcod.FOV_SHADOW
    )
    
    # Apply FOV visualization (dim non-visible tiles)
    # This may use a temporary color layer or modify tile colors
    self.apply_fov_overlay(visible_cells)
    
    return visible_cells

3. Action Parser

LLM responses are parsed into structured actions:

VALID_ACTIONS = {
    "GO": r"GO\s+(NORTH|SOUTH|EAST|WEST|N|S|E|W)",
    "TAKE": r"TAKE\s+(\w+)",
    "DROP": r"DROP\s+(\w+)",
    "PUSH": r"PUSH\s+(\w+)\s+(NORTH|SOUTH|EAST|WEST|N|S|E|W)",
    "USE": r"USE\s+(\w+)(?:\s+ON\s+(\w+))?",
    "OPEN": r"OPEN\s+(\w+)",
    "CLOSE": r"CLOSE\s+(\w+)",
    "LOOK": r"LOOK(?:\s+AT\s+(\w+))?",
    "WAIT": r"WAIT",
    "ANNOUNCE": r"ANNOUNCE\s+[\"'](.+)[\"']",
    "SPEAK": r"SPEAK\s+[\"'](.+)[\"']",
}

def parse_action(self, llm_response: str) -> Optional[Action]:
    """Extract action from LLM response."""
    # Look for action in response (may be wrapped in reasoning)
    for action_type, pattern in VALID_ACTIONS.items():
        match = re.search(pattern, llm_response, re.IGNORECASE)
        if match:
            return Action(type=action_type, args=match.groups())
    
    return Action(type="INVALID", args=(llm_response,))

4. Action Executor

def execute_action(self, agent, action: Action) -> ActionResult:
    """Execute parsed action in the game world."""
    
    if action.type == "GO":
        direction = action.args[0]
        return self.execute_movement(agent, direction)
    
    elif action.type == "ANNOUNCE":
        message = action.args[0]
        return self.execute_announce(agent, message)
    
    elif action.type == "SPEAK":
        message = action.args[0]
        return self.execute_speak(agent, message)
    
    # ... other actions
    
    elif action.type == "INVALID":
        return ActionResult(success=False, message="Could not parse action")

5. Movement Execution (Multi-tile Paths)

When an agent says "GO NORTH" to leave a room, we compute the full path:

def execute_movement(self, agent, direction: str) -> ActionResult:
    """Move agent, potentially through multiple tiles."""
    
    current_room = self.world.room_at(agent.x, agent.y)
    door = self.world.get_door_in_direction(current_room, direction)
    
    if not door:
        return ActionResult(success=False, message="No exit in that direction")
    
    if door.locked:
        return ActionResult(success=False, message="The door is locked")
    
    # Find path to door and through to next room
    target_room = self.world.rooms[door.room_b]
    entry_point = self.world.get_entry_point(target_room, door)
    
    path = libtcod.find_path(self.grid, agent.x, agent.y, *entry_point)
    
    if not path:
        return ActionResult(success=False, message="Path is blocked")
    
    # In simulation mode: teleport
    # In playback mode: this path will be animated
    agent.entity.grid_pos = entry_point
    
    return ActionResult(
        success=True, 
        message=f"You walk to {target_room.display_name}",
        path=path  # Stored for animation playback
    )

6. Speech Subsystem

class SpeechChannel:
    """Manages agent-to-agent communication."""
    
    def __init__(self, world_graph, agents):
        self.world = world_graph
        self.agents = {a.id: a for a in agents}
    
    def announce(self, speaker, message: str):
        """Broadcast to all agents in the same room."""
        room = self.world.room_at(speaker.x, speaker.y)
        
        for agent in self.agents.values():
            if agent.id == speaker.id:
                continue
            if self.world.room_at(agent.x, agent.y) == room:
                agent.receive_message(Message(
                    sender=speaker.id,
                    content=message,
                    type="announce",
                    turn=self.turn_number
                ))
    
    def speak(self, speaker, message: str, radius: int = 4):
        """Send to agents within radius tiles."""
        sx, sy = speaker.x, speaker.y
        
        for agent in self.agents.values():
            if agent.id == speaker.id:
                continue
            ax, ay = agent.x, agent.y
            distance = abs(ax - sx) + abs(ay - sy)  # Manhattan distance
            
            if distance <= radius:
                agent.receive_message(Message(
                    sender=speaker.id,
                    content=message,
                    type="speak",
                    turn=self.turn_number,
                    distance=distance
                ))

7. Simulation Log Format

@dataclass
class SimulationStep:
    turn: int
    agent_id: str
    perception: Dict          # Context shown to LLM
    llm_response: str         # Raw LLM output
    parsed_action: Action     # Structured action
    result: ActionResult      # Outcome
    path: List[Tuple[int,int]] = None  # Movement path for animation
    random_state: Dict = None  # NPC random seeds this turn

@dataclass
class SimulationLog:
    steps: List[SimulationStep]
    initial_state: bytes      # Serialized starting world state
    final_state: bytes        # Serialized ending world state
    metadata: Dict            # Model names, timestamps, etc.
    
    def save(self, path: str):
        """Save log for replay."""
        
    @classmethod
    def load(cls, path: str) -> 'SimulationLog':
        """Load log for replay."""

Blocking Issues

  • #153 - Separate render loop from game state loop (required for clean perspective switching during simulation)
  • #16 - Entity knowledge contents (per-agent FOV and knowledge tracking)

Benefits From

  • #113 - Batch Operations for Grid (efficient FOV overlay application)
  • #114 - CellView API (convenient grid state queries)

Parent Issue

Part of #154 - Grounded Multi-Agent Testbed

# Turn-based LLM Agent Orchestration **Infrastructure for managing multiple LLM agents in a shared McRogueFace environment.** ## Overview This issue covers the orchestration layer that: 1. Switches grid perspective between agents 2. Captures screenshots for VLM input 3. Sends prompts to LLM APIs and parses responses 4. Executes game actions based on LLM outputs 5. Manages the speech subsystem for agent-to-agent communication 6. Records simulation logs for deterministic replay ## Core Components ### 1. Turn Orchestrator ```python class TurnOrchestrator: def __init__(self, grid, world_graph, agents: List[LLMAgent]): self.grid = grid self.world = world_graph self.agents = agents self.turn_number = 0 self.simulation_log = [] def run_turn(self): """Execute one full turn (all agents act once).""" self.turn_number += 1 for agent in self.agents: step = self.run_agent_turn(agent) self.simulation_log.append(step) # After all agents: run NPC behaviors self.run_npc_behaviors() def run_agent_turn(self, agent) -> SimulationStep: # 1. Switch perspective to this agent self.prepare_agent_view(agent) # 2. Render frame and capture screenshot self.render_frame() screenshot_path = self.capture_screenshot(agent) # 3. Build context context = self.world.prepare_agent_context(agent, self.grid) context["image"] = screenshot_path # 4. Query LLM llm_response = agent.query_llm(context) # 5. Parse and execute action action = self.parse_action(llm_response) result = self.execute_action(agent, action) # 6. Record step return SimulationStep( turn=self.turn_number, agent_id=agent.id, perception=context, llm_response=llm_response, parsed_action=action, result=result ) ``` ### 2. Perspective Switching ```python def prepare_agent_view(self, agent): """Configure grid to show this agent's perspective.""" # Center grid on agent ex, ey = agent.entity.grid_pos cell_w, cell_h = self.grid.cell_size self.grid.center = (ex * cell_w + cell_w/2, ey * cell_h + cell_h/2) # Compute FOV from agent position from mcrfpy import libtcod visible_cells = libtcod.compute_fov( self.grid, ex, ey, agent.sight_radius, light_walls=True, algorithm=libtcod.FOV_SHADOW ) # Apply FOV visualization (dim non-visible tiles) # This may use a temporary color layer or modify tile colors self.apply_fov_overlay(visible_cells) return visible_cells ``` ### 3. Action Parser LLM responses are parsed into structured actions: ```python VALID_ACTIONS = { "GO": r"GO\s+(NORTH|SOUTH|EAST|WEST|N|S|E|W)", "TAKE": r"TAKE\s+(\w+)", "DROP": r"DROP\s+(\w+)", "PUSH": r"PUSH\s+(\w+)\s+(NORTH|SOUTH|EAST|WEST|N|S|E|W)", "USE": r"USE\s+(\w+)(?:\s+ON\s+(\w+))?", "OPEN": r"OPEN\s+(\w+)", "CLOSE": r"CLOSE\s+(\w+)", "LOOK": r"LOOK(?:\s+AT\s+(\w+))?", "WAIT": r"WAIT", "ANNOUNCE": r"ANNOUNCE\s+[\"'](.+)[\"']", "SPEAK": r"SPEAK\s+[\"'](.+)[\"']", } def parse_action(self, llm_response: str) -> Optional[Action]: """Extract action from LLM response.""" # Look for action in response (may be wrapped in reasoning) for action_type, pattern in VALID_ACTIONS.items(): match = re.search(pattern, llm_response, re.IGNORECASE) if match: return Action(type=action_type, args=match.groups()) return Action(type="INVALID", args=(llm_response,)) ``` ### 4. Action Executor ```python def execute_action(self, agent, action: Action) -> ActionResult: """Execute parsed action in the game world.""" if action.type == "GO": direction = action.args[0] return self.execute_movement(agent, direction) elif action.type == "ANNOUNCE": message = action.args[0] return self.execute_announce(agent, message) elif action.type == "SPEAK": message = action.args[0] return self.execute_speak(agent, message) # ... other actions elif action.type == "INVALID": return ActionResult(success=False, message="Could not parse action") ``` ### 5. Movement Execution (Multi-tile Paths) When an agent says "GO NORTH" to leave a room, we compute the full path: ```python def execute_movement(self, agent, direction: str) -> ActionResult: """Move agent, potentially through multiple tiles.""" current_room = self.world.room_at(agent.x, agent.y) door = self.world.get_door_in_direction(current_room, direction) if not door: return ActionResult(success=False, message="No exit in that direction") if door.locked: return ActionResult(success=False, message="The door is locked") # Find path to door and through to next room target_room = self.world.rooms[door.room_b] entry_point = self.world.get_entry_point(target_room, door) path = libtcod.find_path(self.grid, agent.x, agent.y, *entry_point) if not path: return ActionResult(success=False, message="Path is blocked") # In simulation mode: teleport # In playback mode: this path will be animated agent.entity.grid_pos = entry_point return ActionResult( success=True, message=f"You walk to {target_room.display_name}", path=path # Stored for animation playback ) ``` ### 6. Speech Subsystem ```python class SpeechChannel: """Manages agent-to-agent communication.""" def __init__(self, world_graph, agents): self.world = world_graph self.agents = {a.id: a for a in agents} def announce(self, speaker, message: str): """Broadcast to all agents in the same room.""" room = self.world.room_at(speaker.x, speaker.y) for agent in self.agents.values(): if agent.id == speaker.id: continue if self.world.room_at(agent.x, agent.y) == room: agent.receive_message(Message( sender=speaker.id, content=message, type="announce", turn=self.turn_number )) def speak(self, speaker, message: str, radius: int = 4): """Send to agents within radius tiles.""" sx, sy = speaker.x, speaker.y for agent in self.agents.values(): if agent.id == speaker.id: continue ax, ay = agent.x, agent.y distance = abs(ax - sx) + abs(ay - sy) # Manhattan distance if distance <= radius: agent.receive_message(Message( sender=speaker.id, content=message, type="speak", turn=self.turn_number, distance=distance )) ``` ### 7. Simulation Log Format ```python @dataclass class SimulationStep: turn: int agent_id: str perception: Dict # Context shown to LLM llm_response: str # Raw LLM output parsed_action: Action # Structured action result: ActionResult # Outcome path: List[Tuple[int,int]] = None # Movement path for animation random_state: Dict = None # NPC random seeds this turn @dataclass class SimulationLog: steps: List[SimulationStep] initial_state: bytes # Serialized starting world state final_state: bytes # Serialized ending world state metadata: Dict # Model names, timestamps, etc. def save(self, path: str): """Save log for replay.""" @classmethod def load(cls, path: str) -> 'SimulationLog': """Load log for replay.""" ``` ## Blocking Issues - #153 - Separate render loop from game state loop (required for clean perspective switching during simulation) - #16 - Entity knowledge contents (per-agent FOV and knowledge tracking) ## Benefits From - #113 - Batch Operations for Grid (efficient FOV overlay application) - #114 - CellView API (convenient grid state queries) ## Parent Issue Part of #154 - Grounded Multi-Agent Testbed
john added the
Major Feature
system:python-binding
priority:tier1-active
workflow:blocked
labels 2025-12-01 16:15:42 +00:00
Author
Owner

Initial VLLM Integration Demo Complete

Created tests/vllm_demo.py as a proof-of-concept demonstrating the core infrastructure:

What's Working

  1. Headless screenshot capture - Using the new mcrfpy.step() + automation.screenshot() from #153
  2. VLLM integration - Successfully sends screenshots to local Gemma 3 (32k context)
  3. Vision model response - VLLM correctly identifies:
    • The player agent (humanoid figure)
    • NPC creatures (the "rat")
    • Dungeon walls and floor tiles
  4. Action selection - VLLM chooses actions like "GO EAST" with reasoning

Demo Scene Setup

  • 20x15 tile grid, 1014px wide display
  • Uses Crypt of Sokoban sprites (kenney_TD_MR_IP.png)
  • Player sprite 84, enemy sprite 123
  • Wall divider with door opening for navigation interest

Example VLLM Response

Okay, let's analyze this.

**Reasoning:**
The room is relatively open, and I'm currently positioned in the top-left corner. 
There's a rat to my right, which could be a potential threat...
Moving East would offer a little more cover.

**Action:** GO EAST

Next Steps for Full Implementation

  • Action parser (extract GO/WAIT/LOOK from response)
  • Action executor (actually move entities)
  • Turn loop (multiple agents taking turns)
  • Perspective switching (center camera on active agent)
  • FOV computation for each agent
  • Speech/announce subsystem for agent communication

This demo unblocks the core rendering loop dependency. The remaining work is Python-side orchestration.

## Initial VLLM Integration Demo Complete Created `tests/vllm_demo.py` as a proof-of-concept demonstrating the core infrastructure: ### What's Working 1. **Headless screenshot capture** - Using the new `mcrfpy.step()` + `automation.screenshot()` from #153 2. **VLLM integration** - Successfully sends screenshots to local Gemma 3 (32k context) 3. **Vision model response** - VLLM correctly identifies: - The player agent (humanoid figure) - NPC creatures (the "rat") - Dungeon walls and floor tiles 4. **Action selection** - VLLM chooses actions like "GO EAST" with reasoning ### Demo Scene Setup - 20x15 tile grid, 1014px wide display - Uses Crypt of Sokoban sprites (kenney_TD_MR_IP.png) - Player sprite 84, enemy sprite 123 - Wall divider with door opening for navigation interest ### Example VLLM Response ``` Okay, let's analyze this. **Reasoning:** The room is relatively open, and I'm currently positioned in the top-left corner. There's a rat to my right, which could be a potential threat... Moving East would offer a little more cover. **Action:** GO EAST ``` ### Next Steps for Full Implementation - [ ] Action parser (extract GO/WAIT/LOOK from response) - [ ] Action executor (actually move entities) - [ ] Turn loop (multiple agents taking turns) - [ ] Perspective switching (center camera on active agent) - [ ] FOV computation for each agent - [ ] Speech/announce subsystem for agent communication This demo unblocks the core rendering loop dependency. The remaining work is Python-side orchestration.
Author
Owner

Multi-Agent VLLM Demo Complete

Committed and pushed multi-agent perspective cycling demo (4713b62, b1b3773).

New Files in tests/vllm_demo/

File Description
0_basic_vllm_demo.py Single agent with FOV, grounded text, VLLM query
1_multi_agent_demo.py Three agents with independent perspective cycling

Features Demonstrated

  1. Per-Agent FOV via ColorLayer.apply_perspective()

    • Fog layer reset between perspective switches prevents state bleed
    • Each agent sees only what's in their field of view
  2. Perspective Cycling

    for agent in agents:
        fov_layer.fill(mcrfpy.Color(0, 0, 0, 255))  # Reset to unknown
        fov_layer.apply_perspective(entity=agent.entity, ...)
        agent.entity.update_visibility()
        grid.center = (px * 16 + 8, py * 16 + 8)
        automation.screenshot(f"{agent.name}_view.png")
    
  3. Grounded Text Generation

    • get_visible_entities() checks grid.is_in_fov(x, y) for each entity
    • Generates prompts like "You see a rat to the west and a knight to the south."
  4. Sequential VLLM Queries

    • Each agent gets their own screenshot + grounded prompt
    • Different perspectives lead to different action choices

Sample Output

Agent Position Sees Action
Wizard (4,7) left Rat GO EAST
Blacksmith (18,5) right Rat, Knight WAIT
Knight (18,10) right Rat, Blacksmith GO NORTH

Remaining Work for Full #156

  • Action parser (regex extraction of GO/WAIT/LOOK)
  • Action executor (move entities based on parsed action)
  • Turn loop (advance game state, re-render, query again)
  • Speech/announce subsystem
  • Simulation logging for replay

The blocking dependency (#153) is fully resolved. Core perspective/screenshot infrastructure is production-ready.

## Multi-Agent VLLM Demo Complete Committed and pushed multi-agent perspective cycling demo (`4713b62`, `b1b3773`). ### New Files in `tests/vllm_demo/` | File | Description | |------|-------------| | `0_basic_vllm_demo.py` | Single agent with FOV, grounded text, VLLM query | | `1_multi_agent_demo.py` | Three agents with independent perspective cycling | ### Features Demonstrated 1. **Per-Agent FOV** via `ColorLayer.apply_perspective()` - Fog layer reset between perspective switches prevents state bleed - Each agent sees only what's in their field of view 2. **Perspective Cycling** ```python for agent in agents: fov_layer.fill(mcrfpy.Color(0, 0, 0, 255)) # Reset to unknown fov_layer.apply_perspective(entity=agent.entity, ...) agent.entity.update_visibility() grid.center = (px * 16 + 8, py * 16 + 8) automation.screenshot(f"{agent.name}_view.png") ``` 3. **Grounded Text Generation** - `get_visible_entities()` checks `grid.is_in_fov(x, y)` for each entity - Generates prompts like "You see a rat to the west and a knight to the south." 4. **Sequential VLLM Queries** - Each agent gets their own screenshot + grounded prompt - Different perspectives lead to different action choices ### Sample Output | Agent | Position | Sees | Action | |-------|----------|------|--------| | Wizard | (4,7) left | Rat | GO EAST | | Blacksmith | (18,5) right | Rat, Knight | WAIT | | Knight | (18,10) right | Rat, Blacksmith | GO NORTH | ### Remaining Work for Full #156 - [ ] Action parser (regex extraction of GO/WAIT/LOOK) - [ ] Action executor (move entities based on parsed action) - [ ] Turn loop (advance game state, re-render, query again) - [ ] Speech/announce subsystem - [ ] Simulation logging for replay The blocking dependency (#153) is fully resolved. Core perspective/screenshot infrastructure is production-ready.
Sign in to join this conversation.
No Milestone
No project
No Assignees
1 Participants
Notifications
Due Date
The due date is invalid or out of range. Please use the format 'yyyy-mm-dd'.

No due date set.

Dependencies

No dependencies set.

Reference: john/McRogueFace#156
No description provided.