diff --git a/tests/vllm_demo/1_multi_agent_demo.py b/tests/vllm_demo/1_multi_agent_demo.py index 50e06fb..debc98e 100644 --- a/tests/vllm_demo/1_multi_agent_demo.py +++ b/tests/vllm_demo/1_multi_agent_demo.py @@ -22,6 +22,9 @@ import base64 import os import random +from action_parser import parse_action +from action_executor import ActionExecutor + # VLLM configuration VLLM_URL = "http://192.168.1.100:8100/v1/chat/completions" SCREENSHOT_DIR = "/tmp/vllm_multi_agent" @@ -284,6 +287,9 @@ def run_demo(): # Setup scene grid, fov_layer, agents, rat = setup_scene() + # Create action executor + executor = ActionExecutor(grid) + # Cycle through each agent's perspective for i, agent in enumerate(agents): print(f"\n{'='*70}") @@ -319,6 +325,21 @@ def run_demo(): print(f"\n{agent.name}'s Response:\n{response}") print() + # Parse and execute action + print(f"--- Action Execution ---") + action = parse_action(response) + print(f"Parsed action: {action.type.value} {action.args}") + + result = executor.execute(agent, action) + if result.success: + print(f"SUCCESS: {result.message}") + if result.new_position: + # Update perspective after movement + switch_perspective(grid, fov_layer, agent) + mcrfpy.step(0.016) + else: + print(f"FAILED: {result.message}") + print("\n" + "=" * 70) print("Multi-Agent Demo Complete") print("=" * 70) diff --git a/tests/vllm_demo/2_integrated_demo.py b/tests/vllm_demo/2_integrated_demo.py new file mode 100644 index 0000000..f499079 --- /dev/null +++ b/tests/vllm_demo/2_integrated_demo.py @@ -0,0 +1,399 @@ +#!/usr/bin/env python3 +""" +Integrated VLLM Demo +==================== + +Combines: +- WorldGraph for structured room descriptions (#155) +- Action parsing and execution (#156) +- Per-agent perspective rendering + +This is the foundation for multi-turn simulation. +""" + +import sys +import os +# Add the vllm_demo directory to path for imports +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +import mcrfpy +from mcrfpy import automation +import requests +import base64 + +from world_graph import ( + WorldGraph, Room, Door, WorldObject, Direction, AgentInfo, + create_two_room_scenario +) +from action_parser import parse_action, ActionType +from action_executor import ActionExecutor + +# Configuration +VLLM_URL = "http://192.168.1.100:8100/v1/chat/completions" +SCREENSHOT_DIR = "/tmp/vllm_integrated" + +# Sprite constants +FLOOR_TILE = 0 +WALL_TILE = 40 +WIZARD_SPRITE = 84 +KNIGHT_SPRITE = 96 + + +class Agent: + """Agent wrapper with WorldGraph integration.""" + + def __init__(self, name: str, display_name: str, entity, world: WorldGraph): + self.name = name + self.display_name = display_name + self.entity = entity + self.world = world + self.message_history = [] # For speech system (future) + + @property + def pos(self) -> tuple: + return (int(self.entity.pos[0]), int(self.entity.pos[1])) + + @property + def current_room(self) -> str: + """Get the name of the room this agent is in.""" + room = self.world.room_at(*self.pos) + return room.name if room else None + + def get_context(self, visible_agents: list) -> dict: + """ + Build complete context for LLM query. + + Args: + visible_agents: List of Agent objects visible to this agent + + Returns: + Dict with location description, available actions, messages + """ + room_name = self.current_room + + # Convert Agent objects to AgentInfo for WorldGraph + agent_infos = [ + AgentInfo( + name=a.name, + display_name=a.display_name, + position=a.pos, + is_player=(a.name == self.name) + ) + for a in visible_agents + ] + + return { + "location": self.world.describe_room( + room_name, + visible_agents=agent_infos, + observer_name=self.name + ), + "available_actions": self.world.get_available_actions(room_name), + "recent_messages": self.message_history[-5:], + } + + +def file_to_base64(file_path): + """Convert image file to base64 string.""" + with open(file_path, 'rb') as f: + return base64.b64encode(f.read()).decode('utf-8') + + +def llm_chat_completion(messages: list): + """Send chat completion request to local LLM.""" + try: + response = requests.post(VLLM_URL, json={'messages': messages}, timeout=60) + return response.json() + except requests.exceptions.RequestException as e: + return {"error": str(e)} + + +def message_with_image(text, image_path): + """Create a message with embedded image for vision models.""" + image_data = file_to_base64(image_path) + return { + "role": "user", + "content": [ + {"type": "text", "text": text}, + {"type": "image_url", "image_url": {"url": "data:image/png;base64," + image_data}} + ] + } + + +def setup_scene_from_world(world: WorldGraph): + """ + Create McRogueFace scene from WorldGraph. + + Carves out rooms and places doors based on WorldGraph data. + """ + mcrfpy.createScene("integrated_demo") + mcrfpy.setScene("integrated_demo") + ui = mcrfpy.sceneUI("integrated_demo") + + texture = mcrfpy.Texture("assets/kenney_TD_MR_IP.png", 16, 16) + + # Create grid sized for the world (with margin) + grid = mcrfpy.Grid( + grid_size=(25, 15), + texture=texture, + pos=(5, 5), + size=(1014, 700) + ) + grid.fill_color = mcrfpy.Color(20, 20, 30) + grid.zoom = 2.0 + ui.append(grid) + + # Initialize all tiles as walls + for x in range(25): + for y in range(15): + point = grid.at(x, y) + point.tilesprite = WALL_TILE + point.walkable = False + point.transparent = False + + # Carve out rooms from WorldGraph + for room in world.rooms.values(): + for rx in range(room.x, room.x + room.width): + for ry in range(room.y, room.y + room.height): + if 0 <= rx < 25 and 0 <= ry < 15: + point = grid.at(rx, ry) + point.tilesprite = FLOOR_TILE + point.walkable = True + point.transparent = True + + # Place doors (carve corridor between rooms) + for door in world.doors: + dx, dy = door.position + if 0 <= dx < 25 and 0 <= dy < 15: + point = grid.at(dx, dy) + point.tilesprite = FLOOR_TILE + point.walkable = not door.locked + point.transparent = True + + # Create FOV layer for fog of war + fov_layer = grid.add_layer('color', z_index=10) + fov_layer.fill(mcrfpy.Color(0, 0, 0, 255)) + + return grid, fov_layer, texture + + +def create_agents(grid, world: WorldGraph, texture) -> list: + """Create agent entities in their starting rooms.""" + agents = [] + + # Agent A: Wizard in guard_room + guard_room = world.rooms["guard_room"] + wizard_entity = mcrfpy.Entity( + grid_pos=guard_room.center, + texture=texture, + sprite_index=WIZARD_SPRITE + ) + grid.entities.append(wizard_entity) + agents.append(Agent("Wizard", "a wizard", wizard_entity, world)) + + # Agent B: Knight in armory + armory = world.rooms["armory"] + knight_entity = mcrfpy.Entity( + grid_pos=armory.center, + texture=texture, + sprite_index=KNIGHT_SPRITE + ) + grid.entities.append(knight_entity) + agents.append(Agent("Knight", "a knight", knight_entity, world)) + + return agents + + +def switch_perspective(grid, fov_layer, agent): + """Switch grid view to an agent's perspective.""" + # Reset fog layer to all unknown (black) + fov_layer.fill(mcrfpy.Color(0, 0, 0, 255)) + + # Apply this agent's perspective + fov_layer.apply_perspective( + entity=agent.entity, + visible=mcrfpy.Color(0, 0, 0, 0), + discovered=mcrfpy.Color(40, 40, 60, 180), + unknown=mcrfpy.Color(0, 0, 0, 255) + ) + + # Update visibility from agent's position + agent.entity.update_visibility() + + # Center camera on this agent + px, py = agent.pos + grid.center = (px * 16 + 8, py * 16 + 8) + + +def get_visible_agents(grid, observer, all_agents) -> list: + """Get agents visible to the observer based on FOV.""" + visible = [] + for agent in all_agents: + if agent.name == observer.name: + continue + ax, ay = agent.pos + if grid.is_in_fov(ax, ay): + visible.append(agent) + return visible + + +def query_agent_llm(agent, screenshot_path, context) -> str: + """ + Query VLLM for agent's action using WorldGraph context. + + This uses the structured context from WorldGraph instead of + ad-hoc grounded prompts. + """ + system_prompt = f"""You are {agent.display_name} in a roguelike dungeon game. +You see the world through screenshots and receive text descriptions. +Your goal is to explore and interact with your environment. +Always end your response with a clear action declaration: "Action: " +""" + + # Build the user prompt with WorldGraph context + actions_str = ", ".join(context["available_actions"]) + + user_prompt = f"""{context["location"]} + +Available actions: {actions_str} + +Look at the screenshot showing your current view. The dark areas are outside your field of vision. + +What would you like to do? State your reasoning briefly (1-2 sentences), then declare your action. +Example: "I see a key on the ground that might be useful. Action: TAKE brass_key" +""" + + messages = [ + {"role": "system", "content": system_prompt}, + message_with_image(user_prompt, screenshot_path) + ] + + resp = llm_chat_completion(messages) + + if "error" in resp: + return f"[VLLM Error: {resp['error']}]" + return resp.get('choices', [{}])[0].get('message', {}).get('content', 'No response') + + +def run_single_turn(grid, fov_layer, agents, executor, turn_num): + """ + Execute one turn for all agents. + + Each agent: + 1. Gets their perspective rendered + 2. Receives WorldGraph context + 3. Queries LLM for action + 4. Executes the action + """ + print(f"\n{'='*70}") + print(f"TURN {turn_num}") + print("=" * 70) + + results = [] + + for agent in agents: + print(f"\n--- {agent.name}'s Turn ---") + print(f"Position: {agent.pos} | Room: {agent.current_room}") + + # Switch perspective to this agent + switch_perspective(grid, fov_layer, agent) + mcrfpy.step(0.016) + + # Take screenshot + screenshot_path = os.path.join( + SCREENSHOT_DIR, + f"turn{turn_num}_{agent.name.lower()}.png" + ) + automation.screenshot(screenshot_path) + print(f"Screenshot: {screenshot_path}") + + # Get context using WorldGraph + visible = get_visible_agents(grid, agent, agents) + context = agent.get_context(visible + [agent]) # Include self for filtering + + print(f"\nContext from WorldGraph:") + print(f" Location: {context['location']}") + print(f" Actions: {context['available_actions']}") + + # Query LLM + print(f"\nQuerying VLLM...") + response = query_agent_llm(agent, screenshot_path, context) + print(f"Response: {response[:300]}{'...' if len(response) > 300 else ''}") + + # Parse and execute action + action = parse_action(response) + print(f"\nParsed: {action.type.value} {action.args}") + + result = executor.execute(agent, action) + status = "SUCCESS" if result.success else "FAILED" + print(f"Result: {status} - {result.message}") + + results.append({ + "agent": agent.name, + "room": agent.current_room, + "context": context, + "response": response, + "action": action, + "result": result + }) + + return results + + +def run_demo(): + """Main demo: single integrated turn with WorldGraph context.""" + print("=" * 70) + print("Integrated WorldGraph + Action Demo") + print("=" * 70) + + os.makedirs(SCREENSHOT_DIR, exist_ok=True) + + # Create world from WorldGraph factory + print("\nCreating world from WorldGraph...") + world = create_two_room_scenario() + print(f" Rooms: {list(world.rooms.keys())}") + print(f" Doors: {len(world.doors)}") + print(f" Objects: {list(world.objects.keys())}") + + # Setup scene from WorldGraph + print("\nSetting up scene...") + grid, fov_layer, texture = setup_scene_from_world(world) + + # Create agents + print("\nCreating agents...") + agents = create_agents(grid, world, texture) + for agent in agents: + print(f" {agent.name} at {agent.pos} in {agent.current_room}") + + # Create executor + executor = ActionExecutor(grid) + + # Run one turn + results = run_single_turn(grid, fov_layer, agents, executor, turn_num=1) + + # Summary + print("\n" + "=" * 70) + print("TURN SUMMARY") + print("=" * 70) + for r in results: + status = "OK" if r["result"].success else "FAIL" + print(f" {r['agent']}: {r['action'].type.value} -> {status}") + if r["result"].new_position: + print(f" New position: {r['result'].new_position}") + + print("\n" + "=" * 70) + print("Demo Complete") + print("=" * 70) + + return True + + +if __name__ == "__main__": + try: + success = run_demo() + print("\nPASS" if success else "\nFAIL") + sys.exit(0 if success else 1) + except Exception as e: + import traceback + traceback.print_exc() + sys.exit(1) diff --git a/tests/vllm_demo/3_multi_turn_demo.py b/tests/vllm_demo/3_multi_turn_demo.py new file mode 100644 index 0000000..3e830c3 --- /dev/null +++ b/tests/vllm_demo/3_multi_turn_demo.py @@ -0,0 +1,318 @@ +#!/usr/bin/env python3 +""" +Multi-Turn Simulation Demo +========================== + +Runs multiple turns of agent interaction with full logging. +This is the Phase 1 implementation from issue #154. + +Two agents start in separate rooms and can move, observe, +and (in future versions) communicate to solve puzzles. +""" + +import sys +import os +# Add the vllm_demo directory to path for imports +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +import mcrfpy +from mcrfpy import automation +import requests +import base64 + +from world_graph import ( + WorldGraph, Room, Door, WorldObject, Direction, AgentInfo, + create_two_room_scenario, create_button_door_scenario +) +from action_parser import parse_action +from action_executor import ActionExecutor +from turn_orchestrator import TurnOrchestrator, SimulationLog + +# Configuration +VLLM_URL = "http://192.168.1.100:8100/v1/chat/completions" +SCREENSHOT_DIR = "/tmp/vllm_multi_turn" +LOG_PATH = "/tmp/vllm_multi_turn/simulation_log.json" +MAX_TURNS = 5 + +# Sprites +FLOOR_TILE = 0 +WALL_TILE = 40 +WIZARD_SPRITE = 84 +KNIGHT_SPRITE = 96 + + +class Agent: + """Agent with WorldGraph integration.""" + + def __init__(self, name: str, display_name: str, entity, world: WorldGraph): + self.name = name + self.display_name = display_name + self.entity = entity + self.world = world + self.message_history = [] + + @property + def pos(self) -> tuple: + return (int(self.entity.pos[0]), int(self.entity.pos[1])) + + @property + def current_room(self) -> str: + room = self.world.room_at(*self.pos) + return room.name if room else None + + def get_context(self, visible_agents: list) -> dict: + """Build context for LLM query.""" + room_name = self.current_room + agent_infos = [ + AgentInfo( + name=a.name, + display_name=a.display_name, + position=a.pos, + is_player=(a.name == self.name) + ) + for a in visible_agents + ] + return { + "location": self.world.describe_room(room_name, agent_infos, self.name), + "available_actions": self.world.get_available_actions(room_name), + "recent_messages": self.message_history[-5:], + } + + +def file_to_base64(path: str) -> str: + """Convert file to base64 string.""" + with open(path, 'rb') as f: + return base64.b64encode(f.read()).decode('utf-8') + + +def llm_query(agent, screenshot_path: str, context: dict) -> str: + """ + Query VLLM for agent action. + + This function is passed to TurnOrchestrator as the LLM query callback. + """ + system_prompt = f"""You are {agent.display_name} exploring a dungeon. +You receive visual and text information about your surroundings. +Your goal is to explore, find items, and interact with the environment. +Always end your response with: Action: """ + + actions_str = ", ".join(context["available_actions"]) + + user_prompt = f"""{context["location"]} + +Available actions: {actions_str} + +[Screenshot attached showing your current view - dark areas are outside your vision] + +What do you do? Brief reasoning (1-2 sentences), then Action: """ + + messages = [ + {"role": "system", "content": system_prompt}, + { + "role": "user", + "content": [ + {"type": "text", "text": user_prompt}, + {"type": "image_url", "image_url": { + "url": "data:image/png;base64," + file_to_base64(screenshot_path) + }} + ] + } + ] + + try: + resp = requests.post(VLLM_URL, json={'messages': messages}, timeout=60) + data = resp.json() + if "error" in data: + return f"[VLLM Error: {data['error']}]" + return data.get('choices', [{}])[0].get('message', {}).get('content', 'No response') + except Exception as e: + return f"[Connection Error: {e}]" + + +def setup_scene(world: WorldGraph): + """Create McRogueFace scene from WorldGraph.""" + mcrfpy.createScene("multi_turn") + mcrfpy.setScene("multi_turn") + ui = mcrfpy.sceneUI("multi_turn") + + texture = mcrfpy.Texture("assets/kenney_TD_MR_IP.png", 16, 16) + + grid = mcrfpy.Grid( + grid_size=(25, 15), + texture=texture, + pos=(5, 5), + size=(1014, 700) + ) + grid.fill_color = mcrfpy.Color(20, 20, 30) + grid.zoom = 2.0 + ui.append(grid) + + # Initialize all as walls + for x in range(25): + for y in range(15): + p = grid.at(x, y) + p.tilesprite = WALL_TILE + p.walkable = False + p.transparent = False + + # Carve rooms from WorldGraph + for room in world.rooms.values(): + for rx in range(room.x, room.x + room.width): + for ry in range(room.y, room.y + room.height): + if 0 <= rx < 25 and 0 <= ry < 15: + p = grid.at(rx, ry) + p.tilesprite = FLOOR_TILE + p.walkable = True + p.transparent = True + + # Place doors + for door in world.doors: + dx, dy = door.position + if 0 <= dx < 25 and 0 <= dy < 15: + p = grid.at(dx, dy) + p.tilesprite = FLOOR_TILE + p.walkable = not door.locked + p.transparent = True + + # FOV layer + fov_layer = grid.add_layer('color', z_index=10) + fov_layer.fill(mcrfpy.Color(0, 0, 0, 255)) + + return grid, fov_layer, texture + + +def create_agents(grid, world: WorldGraph, texture) -> list: + """Create agents in their starting rooms.""" + agents = [] + + # Wizard in guard_room (left) + room_a = world.rooms["guard_room"] + wizard = mcrfpy.Entity( + grid_pos=room_a.center, + texture=texture, + sprite_index=WIZARD_SPRITE + ) + grid.entities.append(wizard) + agents.append(Agent("Wizard", "a wizard", wizard, world)) + + # Knight in armory (right) + room_b = world.rooms["armory"] + knight = mcrfpy.Entity( + grid_pos=room_b.center, + texture=texture, + sprite_index=KNIGHT_SPRITE + ) + grid.entities.append(knight) + agents.append(Agent("Knight", "a knight", knight, world)) + + return agents + + +def run_demo(): + """Run multi-turn simulation.""" + print("=" * 70) + print("Multi-Turn Simulation Demo") + print(f"Running up to {MAX_TURNS} turns with 2 agents") + print("=" * 70) + + os.makedirs(SCREENSHOT_DIR, exist_ok=True) + + # Create world + print("\nCreating world...") + world = create_two_room_scenario() + print(f" Rooms: {list(world.rooms.keys())}") + print(f" Objects: {list(world.objects.keys())}") + + # Setup scene + print("\nSetting up scene...") + grid, fov_layer, texture = setup_scene(world) + + # Create agents + print("\nCreating agents...") + agents = create_agents(grid, world, texture) + for agent in agents: + print(f" {agent.name} at {agent.pos} in {agent.current_room}") + + # Create orchestrator + orchestrator = TurnOrchestrator( + grid=grid, + fov_layer=fov_layer, + world=world, + agents=agents, + screenshot_dir=SCREENSHOT_DIR, + llm_query_fn=llm_query + ) + + # Optional: Define a stop condition + def agents_met(orch): + """Stop when agents are in the same room.""" + return orch.agents_in_same_room() + + # Run simulation + log = orchestrator.run_simulation( + max_turns=MAX_TURNS, + stop_condition=None # Or use agents_met for early stopping + ) + + # Save log + log.save(LOG_PATH) + + # Print summary + print("\n" + "=" * 70) + print(log.summary()) + print("=" * 70) + + # Show final positions + print("\nFinal Agent Positions:") + for agent in agents: + print(f" {agent.name}: {agent.pos} in {agent.current_room}") + + print(f"\nScreenshots saved to: {SCREENSHOT_DIR}/") + print(f"Simulation log saved to: {LOG_PATH}") + + return True + + +def replay_log(log_path: str): + """ + Replay a simulation from a log file. + + This is a utility function for reviewing past simulations. + """ + print(f"Loading simulation from: {log_path}") + log = SimulationLog.load(log_path) + + print("\n" + log.summary()) + + print("\nTurn-by-Turn Replay:") + print("-" * 50) + + current_turn = 0 + for step in log.steps: + if step.turn != current_turn: + current_turn = step.turn + print(f"\n=== Turn {current_turn} ===") + + status = "OK" if step.result_success else "FAIL" + print(f" {step.agent_id}: {step.parsed_action_type} {step.parsed_action_args}") + print(f" {status}: {step.result_message}") + if step.new_position: + print(f" Moved to: {step.new_position}") + + +if __name__ == "__main__": + # Check for replay mode + if len(sys.argv) > 1 and sys.argv[1] == "--replay": + log_file = sys.argv[2] if len(sys.argv) > 2 else LOG_PATH + replay_log(log_file) + sys.exit(0) + + # Normal execution + try: + success = run_demo() + print("\nPASS" if success else "\nFAIL") + sys.exit(0 if success else 1) + except Exception as e: + import traceback + traceback.print_exc() + sys.exit(1) diff --git a/tests/vllm_demo/turn_orchestrator.py b/tests/vllm_demo/turn_orchestrator.py new file mode 100644 index 0000000..b664e7d --- /dev/null +++ b/tests/vllm_demo/turn_orchestrator.py @@ -0,0 +1,301 @@ +""" +Turn Orchestrator +================= + +Manages multi-turn simulation with logging for replay. +Coordinates perspective switching, LLM queries, and action execution. +""" + +import json +import os +from dataclasses import dataclass, asdict, field +from typing import List, Dict, Any, Optional, Callable +from datetime import datetime + +from world_graph import WorldGraph, AgentInfo +from action_parser import Action, ActionType, parse_action +from action_executor import ActionExecutor, ActionResult + + +@dataclass +class SimulationStep: + """Record of one agent's turn.""" + turn: int + agent_id: str + agent_position: tuple + room: str + perception: Dict[str, Any] # Context shown to LLM + llm_response: str # Raw LLM output + parsed_action_type: str # Action type as string + parsed_action_args: tuple # Action arguments + result_success: bool + result_message: str + new_position: Optional[tuple] = None + path: Optional[List[tuple]] = None # For animation replay + timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) + + +@dataclass +class SimulationLog: + """Complete simulation record for replay and analysis.""" + metadata: Dict[str, Any] + steps: List[SimulationStep] = field(default_factory=list) + + def save(self, path: str): + """Save log to JSON file.""" + data = { + "metadata": self.metadata, + "steps": [asdict(s) for s in self.steps] + } + with open(path, 'w') as f: + json.dump(data, f, indent=2, default=str) + print(f"Simulation log saved to: {path}") + + @classmethod + def load(cls, path: str) -> 'SimulationLog': + """Load log from JSON file.""" + with open(path) as f: + data = json.load(f) + + steps = [] + for s in data["steps"]: + # Convert tuple strings back to tuples + if isinstance(s.get("agent_position"), list): + s["agent_position"] = tuple(s["agent_position"]) + if isinstance(s.get("new_position"), list): + s["new_position"] = tuple(s["new_position"]) + if isinstance(s.get("parsed_action_args"), list): + s["parsed_action_args"] = tuple(s["parsed_action_args"]) + if s.get("path"): + s["path"] = [tuple(p) for p in s["path"]] + steps.append(SimulationStep(**s)) + + return cls(metadata=data["metadata"], steps=steps) + + def get_agent_steps(self, agent_name: str) -> List[SimulationStep]: + """Get all steps for a specific agent.""" + return [s for s in self.steps if s.agent_id == agent_name] + + def get_turn_steps(self, turn: int) -> List[SimulationStep]: + """Get all steps from a specific turn.""" + return [s for s in self.steps if s.turn == turn] + + def summary(self) -> str: + """Generate a summary of the simulation.""" + lines = [ + f"Simulation Summary", + f"==================", + f"Total turns: {self.metadata.get('total_turns', 'unknown')}", + f"Total steps: {len(self.steps)}", + f"Agents: {', '.join(self.metadata.get('agent_names', []))}", + f"", + ] + + # Per-agent stats + for agent_name in self.metadata.get('agent_names', []): + agent_steps = self.get_agent_steps(agent_name) + successes = sum(1 for s in agent_steps if s.result_success) + lines.append(f"{agent_name}:") + lines.append(f" Actions: {len(agent_steps)}") + lines.append(f" Successful: {successes}") + if agent_steps: + final = agent_steps[-1] + final_pos = final.new_position or final.agent_position + lines.append(f" Final position: {final_pos}") + lines.append(f" Final room: {final.room}") + lines.append("") + + return "\n".join(lines) + + +class TurnOrchestrator: + """ + Orchestrates multi-turn simulation. + + Handles: + - Turn sequencing + - Perspective switching + - LLM queries + - Action execution + - Simulation logging + """ + + def __init__(self, grid, fov_layer, world: WorldGraph, agents: list, + screenshot_dir: str, llm_query_fn: Callable): + """ + Initialize orchestrator. + + Args: + grid: mcrfpy.Grid instance + fov_layer: Color layer for FOV rendering + world: WorldGraph instance + agents: List of Agent objects + screenshot_dir: Directory for screenshots + llm_query_fn: Function(agent, screenshot_path, context) -> str + """ + self.grid = grid + self.fov_layer = fov_layer + self.world = world + self.agents = agents + self.screenshot_dir = screenshot_dir + self.llm_query_fn = llm_query_fn + + self.executor = ActionExecutor(grid) + self.turn_number = 0 + self.steps: List[SimulationStep] = [] + + os.makedirs(screenshot_dir, exist_ok=True) + + def run_turn(self) -> List[SimulationStep]: + """ + Execute one full turn (all agents act once). + + Returns list of SimulationSteps for this turn. + """ + import mcrfpy + + self.turn_number += 1 + turn_steps = [] + + print(f"\n{'='*60}") + print(f"TURN {self.turn_number}") + print("=" * 60) + + for agent in self.agents: + step = self._run_agent_turn(agent) + turn_steps.append(step) + self.steps.append(step) + + return turn_steps + + def run_simulation(self, max_turns: int = 10, + stop_condition: Callable = None) -> SimulationLog: + """ + Run complete simulation. + + Args: + max_turns: Maximum number of turns to run + stop_condition: Optional callable(orchestrator) -> bool + Returns True to stop simulation early + + Returns: + SimulationLog with all steps + """ + print(f"\nStarting simulation: max {max_turns} turns") + print(f"Agents: {[a.name for a in self.agents]}") + print("=" * 60) + + for turn in range(max_turns): + self.run_turn() + + # Check stop condition + if stop_condition and stop_condition(self): + print(f"\nStop condition met at turn {self.turn_number}") + break + + # Create log + log = SimulationLog( + metadata={ + "total_turns": self.turn_number, + "num_agents": len(self.agents), + "agent_names": [a.name for a in self.agents], + "timestamp": datetime.now().isoformat(), + "world_rooms": list(self.world.rooms.keys()), + "screenshot_dir": self.screenshot_dir, + }, + steps=self.steps + ) + + return log + + def _run_agent_turn(self, agent) -> SimulationStep: + """Execute one agent's turn.""" + import mcrfpy + from mcrfpy import automation + + print(f"\n--- {agent.name}'s Turn ---") + print(f"Position: {agent.pos} | Room: {agent.current_room}") + + # Switch perspective + self._switch_perspective(agent) + mcrfpy.step(0.016) + + # Screenshot + screenshot_path = os.path.join( + self.screenshot_dir, + f"turn{self.turn_number}_{agent.name.lower()}.png" + ) + automation.screenshot(screenshot_path) + + # Build context + visible_agents = self._get_visible_agents(agent) + context = agent.get_context(visible_agents + [agent]) + + # Query LLM + llm_response = self.llm_query_fn(agent, screenshot_path, context) + + # Parse and execute + action = parse_action(llm_response) + result = self.executor.execute(agent, action) + + # Log output + status = "SUCCESS" if result.success else "FAILED" + print(f" Action: {action.type.value} {action.args}") + print(f" Result: {status} - {result.message}") + + # Build step record + step = SimulationStep( + turn=self.turn_number, + agent_id=agent.name, + agent_position=agent.pos, + room=agent.current_room, + perception={ + "location": context["location"], + "available_actions": context["available_actions"], + }, + llm_response=llm_response, + parsed_action_type=action.type.value, + parsed_action_args=action.args, + result_success=result.success, + result_message=result.message, + new_position=result.new_position, + path=result.path + ) + + return step + + def _switch_perspective(self, agent): + """Switch grid view to agent's perspective.""" + import mcrfpy + + self.fov_layer.fill(mcrfpy.Color(0, 0, 0, 255)) + self.fov_layer.apply_perspective( + entity=agent.entity, + visible=mcrfpy.Color(0, 0, 0, 0), + discovered=mcrfpy.Color(40, 40, 60, 180), + unknown=mcrfpy.Color(0, 0, 0, 255) + ) + agent.entity.update_visibility() + + px, py = agent.pos + self.grid.center = (px * 16 + 8, py * 16 + 8) + + def _get_visible_agents(self, observer) -> list: + """Get agents visible to observer based on FOV.""" + visible = [] + for agent in self.agents: + if agent.name == observer.name: + continue + ax, ay = agent.pos + if self.grid.is_in_fov(ax, ay): + visible.append(agent) + return visible + + def get_agent_positions(self) -> Dict[str, tuple]: + """Get current positions of all agents.""" + return {a.name: a.pos for a in self.agents} + + def agents_in_same_room(self) -> bool: + """Check if all agents are in the same room.""" + rooms = [a.current_room for a in self.agents] + return len(set(rooms)) == 1