546 lines
18 KiB
Python
546 lines
18 KiB
Python
import heapq
|
||
from collections import deque
|
||
from abc import ABC, abstractmethod
|
||
import time
|
||
import csv
|
||
import os
|
||
from typing import List, Tuple, Optional, Dict, Set
|
||
|
||
class Cell:
|
||
def __init__(self, x: int, y: int):
|
||
self.x = x
|
||
self.y = y
|
||
self.is_wall = False
|
||
self.is_start = False
|
||
self.is_exit = False
|
||
self.weight = 1
|
||
|
||
def is_passable(self) -> bool:
|
||
return not self.is_wall
|
||
|
||
class Maze:
|
||
def __init__(self, width: int = 0, height: int = 0):
|
||
self.width = width
|
||
self.height = height
|
||
self.cells: List[List[Cell]] = []
|
||
self.start: Optional[Cell] = None
|
||
self.exit: Optional[Cell] = None
|
||
|
||
def get_cell(self, x: int, y: int) -> Optional[Cell]:
|
||
if 0 <= x < self.width and 0 <= y < self.height:
|
||
return self.cells[y][x]
|
||
return None
|
||
|
||
def get_neighbors(self, cell: Cell) -> List[Cell]:
|
||
neighbors = []
|
||
directions = [(0, -1), (0, 1), (-1, 0), (1, 0)]
|
||
|
||
for dx, dy in directions:
|
||
nx, ny = cell.x + dx, cell.y + dy
|
||
neighbor = self.get_cell(nx, ny)
|
||
if neighbor and neighbor.is_passable():
|
||
neighbors.append(neighbor)
|
||
|
||
return neighbors
|
||
|
||
class MazeBuilder(ABC):
|
||
@abstractmethod
|
||
def build_from_file(self, filename: str) -> Maze:
|
||
pass
|
||
|
||
class TextFileMazeBuilder(MazeBuilder):
|
||
def build_from_file(self, filename: str) -> Maze:
|
||
with open(filename, 'r', encoding='utf-8') as f:
|
||
lines = f.readlines()
|
||
|
||
height = len(lines)
|
||
width = len(lines[0].strip()) if height > 0 else 0
|
||
|
||
maze = Maze(width, height)
|
||
maze.cells = [[Cell(x, y) for x in range(width)] for y in range(height)]
|
||
|
||
for y, line in enumerate(lines):
|
||
line = line.rstrip('\n')
|
||
for x, ch in enumerate(line):
|
||
if x < width:
|
||
cell = maze.cells[y][x]
|
||
if ch == '#':
|
||
cell.is_wall = True
|
||
elif ch == 'S':
|
||
cell.is_start = True
|
||
maze.start = cell
|
||
elif ch == 'E':
|
||
cell.is_exit = True
|
||
maze.exit = cell
|
||
elif ch.isdigit():
|
||
cell.weight = int(ch)
|
||
cell.is_wall = False
|
||
|
||
if not maze.start or not maze.exit:
|
||
raise ValueError("Лабиринт должен содержать старт (S) и выход (E)")
|
||
|
||
return maze
|
||
|
||
class PathFindingStrategy(ABC):
|
||
@abstractmethod
|
||
def find_path(self, maze: Maze, start: Cell, exit_cell: Cell) -> List[Cell]:
|
||
pass
|
||
|
||
class BFSStrategy(PathFindingStrategy):
|
||
def find_path(self, maze: Maze, start: Cell, exit_cell: Cell) -> List[Cell]:
|
||
queue = deque([start])
|
||
visited = {start}
|
||
parent = {start: None}
|
||
|
||
while queue:
|
||
current = queue.popleft()
|
||
|
||
if current == exit_cell:
|
||
return self.reconstruct_path(parent, start, exit_cell)
|
||
|
||
for neighbor in maze.get_neighbors(current):
|
||
if neighbor not in visited:
|
||
visited.add(neighbor)
|
||
parent[neighbor] = current
|
||
queue.append(neighbor)
|
||
|
||
return []
|
||
|
||
def reconstruct_path(self, parent: Dict, start: Cell, exit_cell: Cell) -> List[Cell]:
|
||
path = []
|
||
current = exit_cell
|
||
while current is not None:
|
||
path.append(current)
|
||
current = parent[current]
|
||
path.reverse()
|
||
return path
|
||
|
||
class DFSStrategy(PathFindingStrategy):
|
||
def find_path(self, maze: Maze, start: Cell, exit_cell: Cell) -> List[Cell]:
|
||
stack = [(start, [start])]
|
||
visited = {start}
|
||
|
||
while stack:
|
||
current, path = stack.pop()
|
||
|
||
if current == exit_cell:
|
||
return path
|
||
|
||
for neighbor in maze.get_neighbors(current):
|
||
if neighbor not in visited:
|
||
visited.add(neighbor)
|
||
stack.append((neighbor, path + [neighbor]))
|
||
|
||
return []
|
||
|
||
class AStarStrategy(PathFindingStrategy):
|
||
def heuristic(self, a: Cell, b: Cell) -> int:
|
||
return abs(a.x - b.x) + abs(a.y - b.y)
|
||
|
||
def find_path(self, maze: Maze, start: Cell, exit_cell: Cell) -> List[Cell]:
|
||
open_set = [(0, start)]
|
||
came_from = {}
|
||
g_score = {start: 0}
|
||
f_score = {start: self.heuristic(start, exit_cell)}
|
||
|
||
while open_set:
|
||
_, current = heapq.heappop(open_set)
|
||
|
||
if current == exit_cell:
|
||
return self.reconstruct_path(came_from, start, exit_cell)
|
||
|
||
for neighbor in maze.get_neighbors(current):
|
||
tentative_g = g_score[current] + neighbor.weight
|
||
|
||
if neighbor not in g_score or tentative_g < g_score[neighbor]:
|
||
came_from[neighbor] = current
|
||
g_score[neighbor] = tentative_g
|
||
f_score[neighbor] = tentative_g + self.heuristic(neighbor, exit_cell)
|
||
heapq.heappush(open_set, (f_score[neighbor], neighbor))
|
||
|
||
return []
|
||
|
||
def reconstruct_path(self, came_from: Dict, start: Cell, exit_cell: Cell) -> List[Cell]:
|
||
path = []
|
||
current = exit_cell
|
||
while current != start:
|
||
path.append(current)
|
||
current = came_from[current]
|
||
path.append(start)
|
||
path.reverse()
|
||
return path
|
||
|
||
class DijkstraStrategy(PathFindingStrategy):
|
||
def find_path(self, maze: Maze, start: Cell, exit_cell: Cell) -> List[Cell]:
|
||
pq = [(0, start)]
|
||
distances = {start: 0}
|
||
parent = {start: None}
|
||
|
||
while pq:
|
||
dist, current = heapq.heappop(pq)
|
||
|
||
if current == exit_cell:
|
||
return self.reconstruct_path(parent, start, exit_cell)
|
||
|
||
if dist > distances[current]:
|
||
continue
|
||
|
||
for neighbor in maze.get_neighbors(current):
|
||
new_dist = dist + neighbor.weight
|
||
|
||
if neighbor not in distances or new_dist < distances[neighbor]:
|
||
distances[neighbor] = new_dist
|
||
parent[neighbor] = current
|
||
heapq.heappush(pq, (new_dist, neighbor))
|
||
|
||
return []
|
||
|
||
def reconstruct_path(self, parent: Dict, start: Cell, exit_cell: Cell) -> List[Cell]:
|
||
path = []
|
||
current = exit_cell
|
||
while current is not None:
|
||
path.append(current)
|
||
current = parent[current]
|
||
path.reverse()
|
||
return path
|
||
|
||
class SearchStats:
|
||
def __init__(self, time_ms: float, visited_cells: int, path_length: int, path: List[Cell] = None):
|
||
self.time_ms = time_ms
|
||
self.visited_cells = visited_cells
|
||
self.path_length = path_length
|
||
self.path = path
|
||
|
||
class MazeSolver:
|
||
def __init__(self, maze: Maze, strategy: PathFindingStrategy):
|
||
self.maze = maze
|
||
self.strategy = strategy
|
||
self.observers = []
|
||
|
||
def set_strategy(self, strategy: PathFindingStrategy):
|
||
self.strategy = strategy
|
||
|
||
def add_observer(self, observer):
|
||
self.observers.append(observer)
|
||
|
||
def notify_observers(self, event: str, data=None):
|
||
for observer in self.observers:
|
||
observer.update(event, data)
|
||
|
||
def solve(self) -> SearchStats:
|
||
self.notify_observers("search_started")
|
||
|
||
start_time = time.perf_counter()
|
||
path = self.strategy.find_path(self.maze, self.maze.start, self.maze.exit)
|
||
end_time = time.perf_counter()
|
||
|
||
time_ms = (end_time - start_time) * 1000
|
||
visited_cells = self.count_visited_cells()
|
||
path_length = len(path)
|
||
|
||
stats = SearchStats(time_ms, visited_cells, path_length, path)
|
||
|
||
self.notify_observers("search_finished", stats)
|
||
|
||
return stats
|
||
|
||
def count_visited_cells(self) -> int:
|
||
if isinstance(self.strategy, BFSStrategy):
|
||
return len(self.bfs_visited)
|
||
elif isinstance(self.strategy, DFSStrategy):
|
||
return len(self.dfs_visited)
|
||
return 0
|
||
|
||
class Observer(ABC):
|
||
@abstractmethod
|
||
def update(self, event: str, data=None):
|
||
pass
|
||
|
||
class ConsoleView(Observer):
|
||
def __init__(self):
|
||
self.maze = None
|
||
self.current_path = None
|
||
|
||
def set_maze(self, maze: Maze):
|
||
self.maze = maze
|
||
|
||
def update(self, event: str, data=None):
|
||
if event == "search_finished":
|
||
self.display_path(data.path)
|
||
elif event == "search_started":
|
||
print("\nПоиск пути начат...")
|
||
|
||
def display_path(self, path: List[Cell]):
|
||
if not path:
|
||
print("\nПуть не найден!")
|
||
return
|
||
|
||
print(f"\nПуть найден! Длина: {len(path)} шагов")
|
||
self.render(path)
|
||
|
||
def render(self, path: List[Cell] = None):
|
||
if not self.maze:
|
||
return
|
||
|
||
path_set = set(path) if path else set()
|
||
|
||
for y in range(self.maze.height):
|
||
for x in range(self.maze.width):
|
||
cell = self.maze.get_cell(x, y)
|
||
if cell in path_set:
|
||
print('*', end='')
|
||
elif cell.is_start:
|
||
print('S', end='')
|
||
elif cell.is_exit:
|
||
print('E', end='')
|
||
elif cell.is_wall:
|
||
print('#', end='')
|
||
else:
|
||
print(' ', end='')
|
||
print()
|
||
|
||
class Player:
|
||
def __init__(self, start_cell: Cell):
|
||
self.current = start_cell
|
||
self.history = []
|
||
|
||
def move_to(self, cell: Cell):
|
||
if cell and cell.is_passable():
|
||
self.history.append(self.current)
|
||
self.current = cell
|
||
return True
|
||
return False
|
||
|
||
def undo(self):
|
||
if self.history:
|
||
self.current = self.history.pop()
|
||
return True
|
||
return False
|
||
|
||
class Command(ABC):
|
||
@abstractmethod
|
||
def execute(self) -> bool:
|
||
pass
|
||
|
||
@abstractmethod
|
||
def undo(self):
|
||
pass
|
||
|
||
class MoveCommand(Command):
|
||
def __init__(self, player: Player, direction: str, maze: Maze):
|
||
self.player = player
|
||
self.direction = direction
|
||
self.maze = maze
|
||
self.previous_position = None
|
||
|
||
def execute(self) -> bool:
|
||
self.previous_position = self.player.current
|
||
|
||
dx, dy = 0, 0
|
||
if self.direction == 'W':
|
||
dy = -1
|
||
elif self.direction == 'S':
|
||
dy = 1
|
||
elif self.direction == 'A':
|
||
dx = -1
|
||
elif self.direction == 'D':
|
||
dx = 1
|
||
|
||
new_cell = self.maze.get_cell(self.player.current.x + dx, self.player.current.y + dy)
|
||
|
||
if new_cell and new_cell.is_passable():
|
||
return self.player.move_to(new_cell)
|
||
return False
|
||
|
||
def undo(self):
|
||
if self.previous_position:
|
||
self.player.current = self.previous_position
|
||
|
||
def generate_test_mazes():
|
||
test_mazes = {
|
||
"tiny": [
|
||
"########",
|
||
"#S #",
|
||
"# #### #",
|
||
"# E #",
|
||
"########"
|
||
],
|
||
"empty": [
|
||
"########",
|
||
"#S #",
|
||
"# #",
|
||
"# E#",
|
||
"########"
|
||
],
|
||
"no_exit": [
|
||
"########",
|
||
"#S #",
|
||
"# #### #",
|
||
"# # #",
|
||
"########"
|
||
],
|
||
"weighted": [
|
||
"########",
|
||
"#S2 #",
|
||
"# 5#3 #",
|
||
"# 2 E #",
|
||
"########"
|
||
]
|
||
}
|
||
|
||
os.makedirs("mazes", exist_ok=True)
|
||
|
||
for name, maze_data in test_mazes.items():
|
||
filename = f"mazes/{name}.txt"
|
||
with open(filename, 'w', encoding='utf-8') as f:
|
||
f.write('\n'.join(maze_data))
|
||
print(f"Создан лабиринт: {filename}")
|
||
|
||
def run_experiments():
|
||
strategies = {
|
||
"BFS": BFSStrategy(),
|
||
"DFS": DFSStrategy(),
|
||
"A*": AStarStrategy(),
|
||
"Dijkstra": DijkstraStrategy()
|
||
}
|
||
|
||
mazes_list = ["tiny", "empty", "no_exit", "weighted"]
|
||
results = []
|
||
|
||
for maze_name in mazes_list:
|
||
filename = f"mazes/{maze_name}.txt"
|
||
|
||
try:
|
||
builder = TextFileMazeBuilder()
|
||
maze = builder.build_from_file(filename)
|
||
|
||
print(f"\nТестирование лабиринта: {maze_name}")
|
||
|
||
for strategy_name, strategy in strategies.items():
|
||
print(f" Стратегия: {strategy_name}")
|
||
|
||
times = []
|
||
visited_counts = []
|
||
path_lengths = []
|
||
|
||
for i in range(5):
|
||
solver = MazeSolver(maze, strategy)
|
||
stats = solver.solve()
|
||
times.append(stats.time_ms)
|
||
visited_counts.append(stats.visited_cells if stats.visited_cells else 0)
|
||
path_lengths.append(stats.path_length)
|
||
|
||
avg_time = sum(times) / len(times)
|
||
avg_visited = sum(visited_counts) / len(visited_counts)
|
||
avg_path_len = sum(path_lengths) / len(path_lengths)
|
||
|
||
results.append([
|
||
maze_name, strategy_name, avg_time, avg_visited, avg_path_len
|
||
])
|
||
|
||
print(f" Время: {avg_time:.3f} мс, Посещено: {avg_visited:.1f}, Путь: {avg_path_len:.1f}")
|
||
|
||
except Exception as e:
|
||
print(f"Ошибка загрузки {maze_name}: {e}")
|
||
|
||
with open("maze_results.csv", 'w', newline='', encoding='utf-8') as f:
|
||
writer = csv.writer(f)
|
||
writer.writerow(["Лабиринт", "Стратегия", "Время_мс", "Посещено_клеток", "Длина_пути"])
|
||
writer.writerows(results)
|
||
|
||
print("\nРезультаты сохранены в maze_results.csv")
|
||
|
||
def interactive_mode():
|
||
print("\n=== ИНТЕРАКТИВНЫЙ РЕЖИМ ===")
|
||
filename = input("Введите имя файла с лабиринтом: ")
|
||
|
||
try:
|
||
builder = TextFileMazeBuilder()
|
||
maze = builder.build_from_file(filename)
|
||
|
||
print("\nВыберите стратегию:")
|
||
print("1. BFS (кратчайший путь)")
|
||
print("2. DFS (быстрый, не обязательно кратчайший)")
|
||
print("3. A* (оптимальный с эвристикой)")
|
||
print("4. Dijkstra (для взвешенных лабиринтов)")
|
||
|
||
choice = input("Ваш выбор (1-4): ")
|
||
|
||
strategies = {
|
||
'1': BFSStrategy(),
|
||
'2': DFSStrategy(),
|
||
'3': AStarStrategy(),
|
||
'4': DijkstraStrategy()
|
||
}
|
||
|
||
if choice not in strategies:
|
||
print("Неверный выбор!")
|
||
return
|
||
|
||
solver = MazeSolver(maze, strategies[choice])
|
||
view = ConsoleView()
|
||
view.set_maze(maze)
|
||
solver.add_observer(view)
|
||
|
||
stats = solver.solve()
|
||
|
||
print(f"\nСтатистика:")
|
||
print(f"Время выполнения: {stats.time_ms:.3f} мс")
|
||
print(f"Длина пути: {stats.path_length}")
|
||
|
||
input("\nНажмите Enter для ручного режима...")
|
||
|
||
player = Player(maze.start)
|
||
|
||
while player.current != maze.exit:
|
||
os.system('cls' if os.name == 'nt' else 'clear')
|
||
view.render()
|
||
print(f"\nТекущая позиция: ({player.current.x}, {player.current.y})")
|
||
print("Управление: W/A/S/D для движения, Z для отмены, Q для выхода")
|
||
|
||
cmd = input("> ").upper()
|
||
|
||
if cmd == 'Q':
|
||
break
|
||
elif cmd == 'Z':
|
||
command = MoveCommand(player, 'U', maze)
|
||
command.undo()
|
||
print("Отмена последнего хода")
|
||
elif cmd in ['W', 'A', 'S', 'D']:
|
||
command = MoveCommand(player, cmd, maze)
|
||
if command.execute():
|
||
print("Перемещение выполнено")
|
||
else:
|
||
print("Нельзя пройти в этом направлении")
|
||
else:
|
||
print("Неизвестная команда")
|
||
|
||
if player.current == maze.exit:
|
||
print("\nПОЗДРАВЛЯЮ! ВЫ НАШЛИ ВЫХОД!")
|
||
break
|
||
|
||
except Exception as e:
|
||
print(f"Ошибка: {e}")
|
||
|
||
def main():
|
||
print("="*80)
|
||
print("ПОИСК ВЫХОДА ИЗ ЛАБИРИНТА")
|
||
print("Применённые паттерны: Builder, Strategy, Observer, Command")
|
||
print("="*80)
|
||
|
||
generate_test_mazes()
|
||
|
||
print("\n1. Запустить эксперименты")
|
||
print("2. Интерактивный режим")
|
||
|
||
choice = input("\nВыберите режим (1-2): ")
|
||
|
||
if choice == '1':
|
||
run_experiments()
|
||
elif choice == '2':
|
||
interactive_mode()
|
||
else:
|
||
print("Неверный выбор!")
|
||
|
||
if __name__ == "__main__":
|
||
main() |