# Exercise 7: State Space Search

In [3]:
# Install packages incase you cannot import
import unittest
import sys
import heapq
from collections import deque

## Implement succ, goal and expand

In [10]:
class JarSearchProblem:
    def __init__(self, start_state: tuple[int, int, int]):
        self.CAPACITY=(8,5,3)
        self.start_state = start_state

    def succ(self, s: tuple[int, int, int]) -> list[tuple[int, int, int]]:
        successors = []
        """Your code here"""
        # (8, 0 ,0)
        # 0, 8
        for from_index, from_value in enumerate(s):
            for to_index, to_value in enumerate(s):
                # Case 1: Same jar
                if from_index == to_index:
                    # Cannot self fill
                    continue
                # Case 2: Empty "from" jar
                elif from_value == 0:
                    # Cannot fill from empty jar
                    continue
                # Case 3: Full "to" jar
                elif to_value == self.CAPACITY[to_index]:
                    # Cannot fill to full jar
                    continue
                # Possible new state
                else:
                    # Find bottleneck
                    space = self.CAPACITY[to_index] - to_value # What do we compute here?
                    transit = min(space, from_value)
                    # Perform transfer
                    new_state = list(s) # Are we assiging the reference or the value?
                    new_state[to_index] += transit
                    new_state[from_index] -= transit
                    successors.append(tuple(new_state))
        return successors
    
    def goal(self, s: tuple[int, int, int]) -> bool:
        """Your code here"""
        return any([4 == i for i in s])
    
def expand(n, succ):
    sstates = succ(n[1])
    nodes = []
    for s in sstates:
        nodes.append((n[0]+1, s, n[2] + [n[1]])) # Cost, new_state, list of previous states
    return nodes

Test by running the following cell

In [11]:
class TestSucc(unittest.TestCase):
    
    def testSucc1(self):
        n = (0, (8, 0, 0), [])
        search_problem = JarSearchProblem((0,0,0))
        gold_successors = [(1, (3,5,0), [(8, 0, 0)]), (1, (5,0,3), [(8, 0, 0)])]
        successors = expand(n, search_problem.succ)
        for i in range(len(gold_successors)):
            self.assertEqual(gold_successors[i], successors[i])

    def testSucc2(self):
        n = (2, (3, 2, 3), [])
        search_problem = JarSearchProblem((0,0,0))
        successors = expand(n, search_problem.succ)
        gold_successors = [(3, (0,5,3), [(3, 2, 3)]), (3, (5,0,3), [(3, 2, 3)]), (3, (6,2,0), [(3, 2, 3)]), (3, (3,5,0), [(3, 2, 3)])]
        for i in range(len(gold_successors)):
            self.assertEqual(gold_successors[i], successors[i])
    
    def testGoal1(self):
        s = (3,2,3)
        search_problem = JarSearchProblem((0,0,0))
        self.assertEqual(search_problem.goal(s), False)
    
    def testGoal2(self):
        s = (4,4,0)
        search_problem = JarSearchProblem((0,0,0))
        self.assertEqual(search_problem.goal(s), True)

# Run the test cases
unittest.main(argv=[''], verbosity=2, exit=False)
# Clean up
del TestSucc

testGoal1 (__main__.TestSucc) ... ok
testGoal2 (__main__.TestSucc) ... ok
testSucc1 (__main__.TestSucc) ... ok
testSucc2 (__main__.TestSucc) ... ok

----------------------------------------------------------------------
Ran 4 tests in 0.009s

OK


## Implement BFS algorithm

In [220]:
# Example nodes: (0, (8,0,0), []), (1, (3,5,0), [(8,0,0)])
def bfs(search_problem): # search_problem encapsulates start_state, goal-function, succ-function
    """Your code here"""
    open = deque() # Why do we use deque?
    s_init = search_problem.start_state
    expanded_states = set()
    open.append((0, s_init, [])) # How to find out how a node looks like?
    expanded_states.add(s_init)
    while open:
        n = open.popleft()
        if search_problem.goal(n[1]):
            return n[1], n[0], n[2] + [n[1]]
        for m in expand(n, search_problem.succ):
            if m[1] not in expanded_states: # Why do we keep track of expanded states?
                expanded_states.add(m[1])
                open.append(m)
    return False

Test by running the following cell

In [221]:
# Test cases (test only proper execution of the method. Does not test for complexity).
class TestBFS(unittest.TestCase):
    
    def testBFS1(self):
        search_problem = JarSearchProblem((8,0,0))
        state, no_pourings, explored_solution = bfs(search_problem)
        gold_solution = [(8, 0, 0), (3, 5, 0), (3, 2, 3), (6, 2, 0), (6, 0, 2), (1, 5, 2), (1, 4, 3)]
        self.assertEqual(state, (1,4,3))
        self.assertEqual(no_pourings, 6)
        self.assertEqual(explored_solution, gold_solution)

# Run the test cases
unittest.main(argv=[''], verbosity=2, exit=False)
# Clean up
del TestBFS

testBFS1 (__main__.TestBFS) ... ok

----------------------------------------------------------------------
Ran 1 test in 0.002s

OK


## Implement Iterative Deepening Search algorithm

In [222]:
def limited_dfs(search_problem, k: int):
    """Your code here"""
    open = []
    s_init = search_problem.start_state
    open.append((0, s_init, []))
    expanded_states=set()
    expanded_states.add(s_init)
    while open:
        n = open.pop()
        if search_problem.goal(n[1]):
            return n[1], n[0], n[2] + [n[1]]
        if n[0] < k:
            for m in expand(n, search_problem.succ):
                if m[1] not in expanded_states:
                    expanded_states.add(m[1])
                    open.append(m)
    return False

def ids(search_problem):
    """Your code here"""
    for k in range(0, sys.maxsize):
        result = limited_dfs(search_problem, k)
        if result:
            return result[0], result[1], result[2]
            

In [224]:
# Test cases (test only proper execution of the method. Does not test for complexity).
class TestIDS(unittest.TestCase):
    
    def testIDS1(self):
        search_problem = JarSearchProblem((8,0,0))
        state, no_pourings, visited_states = ids(search_problem)
        gold_explored_states = [(8, 0, 0), (3, 5, 0), (3, 2, 3), (6, 2, 0), (6, 0, 2), (1, 5, 2), (1, 4, 3)]
        self.assertEqual(state, (1,4,3))
        self.assertEqual(no_pourings, 6)
        self.assertEqual(visited_states, gold_explored_states)

# Run the test cases
unittest.main(argv=[''], verbosity=2, exit=False)
# Clean up
del TestIDS

testIDS1 (__main__.TestIDS) ... ok

----------------------------------------------------------------------
Ran 1 test in 0.003s

OK


## Implement $A^*$ algorithm

In [16]:
# Priority queue similar to exercise sheet 5 - Dijkstra
class PriorityQueue:
    def __init__(self):
        self._heap = []

    def push(self, item, priority):
         # Adds an element to the PriorityQueue and maintains the heap property
        heapq.heappush(self._heap, (priority, item))

    def pop(self):
        # Extracts the minimum element from the PriorityQueue and maintains the heap property
        if not self.is_empty():
            return heapq.heappop(self._heap)
        else:
            raise IndexError("pop from an empty priority queue")

    def decrease_key(self, item, new_priority):
        # Find the index of the item in the heap
        for i, (priority, current_item) in enumerate(self._heap):
            if current_item == item:
                # Update the priority
                self._heap[i] = (new_priority, current_item)
                # Restore the heap property
                heapq.heapify(self._heap)
                return

    def is_empty(self):
        return len(self._heap) == 0

In [17]:
# Heuristic constants
STATE_TO_HEURISTIC={
    "s":9,
    "t":1,
    "x":0,
    "y":4,
    "z":13
}

class GraphSearchProblem:
    def __init__(self, vertices, start_state, goal_state):
        self.adj = {} # Stores the adjacency lists in form {<from_state>: [(<to_state>, cost),(<to_state>, cost)]} (e.g., {A: [(B, 1),(C, 3)]})
        self.start_state = start_state
        self.goal_state = goal_state
        for v in vertices:
            self.adj[v] = []

    def add_edge(self, from_state: any, to_state: tuple[any, int]) -> None:
        """
        Adds an edge to self.adj in the following form: {from_vertex: [to_state]}. 
        Note that to_state is a tuple of form (state, cost)
        """
        self.adj[from_state].append(to_state)

    def goal(self, state):
        """Checks whether the given state is equal to the goal state"""
        return state == self.goal_state

    def succ(self, state: any) -> list[tuple[int, any]]:
        """
        Returns the successors of a state
        """
        return self.adj[state]

    def h(self, state):
        """Returns the heuristic for a given state"""
        return STATE_TO_HEURISTIC[state] 

def expand(n, succ, heuristic_function):
    s = n[1]
    sstates = succ(s)
    nodes = []
    for v, c in sstates:
        nodes.append((n[0]-heuristic_function(s)+c, v)) # Cost, successor state
    return nodes

# a_start implementation
def a_star(search_problem):
    s_init = search_problem.start_state
    visited = {} # Keeps track of the visited states and their current distance to the start state in form {<state>: cost} (e.g., {s:0})
    open = PriorityQueue()
    open.push(s_init, search_problem.h(s_init)) # What does self.h(s_init) return?
    visited[s_init] = 0
    """Your code here"""
    while not open.is_empty():
        n = open.pop() 
        # How does a node look like?
        # n[0] ==>
        # n[1] ==>
        if search_problem.goal(n[1]):
            return n[1], visited[n[1]] 
        for m in expand(n, search_problem.succ, search_problem.h): # (4, "s") (6, "s")
            f = m[0] + search_problem.h(m[1])
            # Expanded a new state
            if m[1] not in visited: # Why do we check for state and not for node?
                visited[m[1]] = m[0]
                open.push(m[1], f)
            # Expanded a state that was already visited
            elif m[0] < visited[m[1]]: # What does self.visited[m[1]] return?
                visited[m[1]] = m[0]
                inop = False
                # Is the state still in open?
                for l in open._heap:
                    if l[1] == m[1]:
                        open.decrease_key(m[1], f)
                        inop = True
                        break
                # The state is no longer in open
                if not inop:
                    open.push(m[1], f)
    return False

The following two code blocks should help to understand the given functions by providing some examples.

In [18]:
# Test cases
class TestAStar(unittest.TestCase):
    def testAStar1(self):
        g = GraphSearchProblem(["s","t","x","y","z"], "s", "x")
        g.add_edge("s", ("t", 10))
        g.add_edge("s", ("y", 5))
        g.add_edge("t", ("s", 10))
        g.add_edge("t", ("x", 1))
        g.add_edge("t", ("y", 3))
        g.add_edge("x", ("t", 1))
        g.add_edge("x", ("y", 9))
        g.add_edge("x", ("z", 6))
        g.add_edge("y", ("s", 5))
        g.add_edge("y", ("t", 3))
        g.add_edge("y", ("x", 9))
        g.add_edge("y", ("z", 2))
        g.add_edge("z", ("y", 2))
        g.add_edge("z", ("x", 6))
        self.assertEqual(a_star(g), ("x", 9))
    
    def testAStar2(self):
        g = GraphSearchProblem(["s","t","x","y","z"], "s", "t")
        g.add_edge("s", ("t", 10))
        g.add_edge("s", ("y", 5))
        g.add_edge("t", ("s", 10))
        g.add_edge("t", ("x", 1))
        g.add_edge("t", ("y", 3))
        g.add_edge("x", ("t", 1))
        g.add_edge("x", ("y", 9))
        g.add_edge("x", ("z", 6))
        g.add_edge("y", ("s", 5))
        g.add_edge("y", ("t", 3))
        g.add_edge("y", ("x", 9))
        g.add_edge("y", ("z", 2))
        g.add_edge("z", ("y", 2))
        g.add_edge("z", ("x", 6))
        self.assertEqual(a_star(g), ("t", 8))
    
    def testAStar3(self):
        g = GraphSearchProblem(["s","t","x","y","z"], "s", "y")
        g.add_edge("s", ("t", 10))
        g.add_edge("s", ("y", 5))
        g.add_edge("t", ("s", 10))
        g.add_edge("t", ("x", 1))
        g.add_edge("t", ("y", 3))
        g.add_edge("x", ("t", 1))
        g.add_edge("x", ("y", 9))
        g.add_edge("x", ("z", 6))
        g.add_edge("y", ("s", 5))
        g.add_edge("y", ("t", 3))
        g.add_edge("y", ("x", 9))
        g.add_edge("y", ("z", 2))
        g.add_edge("z", ("y", 2))
        g.add_edge("z", ("x", 6))
        self.assertEqual(a_star(g), ("y", 5))
    
    def testAStar4(self):
        g = GraphSearchProblem(["s","t","x","y","z"], "s", "z")
        g.add_edge("s", ("t", 10))
        g.add_edge("s", ("y", 5))
        g.add_edge("t", ("s", 10))
        g.add_edge("t", ("x", 1))
        g.add_edge("t", ("y", 3))
        g.add_edge("x", ("t", 1))
        g.add_edge("x", ("y", 9))
        g.add_edge("x", ("z", 6))
        g.add_edge("y", ("s", 5))
        g.add_edge("y", ("t", 3))
        g.add_edge("y", ("x", 9))
        g.add_edge("y", ("z", 2))
        g.add_edge("z", ("y", 2))
        g.add_edge("z", ("x", 6))
        self.assertEqual(a_star(g), ("z", 7))
        
    def testAStar5(self):
        g = GraphSearchProblem(["a","s","t","x","y","z"], "s", "a")
        g.add_edge("s", ("t", 10))
        g.add_edge("s", ("y", 5))
        g.add_edge("t", ("s", 10))
        g.add_edge("t", ("x", 1))
        g.add_edge("t", ("y", 3))
        g.add_edge("x", ("t", 1))
        g.add_edge("x", ("y", 9))
        g.add_edge("x", ("z", 6))
        g.add_edge("y", ("s", 5))
        g.add_edge("y", ("t", 3))
        g.add_edge("y", ("x", 9))
        g.add_edge("y", ("z", 2))
        g.add_edge("z", ("y", 2))
        g.add_edge("z", ("x", 6))
        self.assertEqual(a_star(g), False)
        
# Run the test cases
unittest.main(argv=[''], verbosity=2, exit=False)
# Clean up
del TestAStar

testAStar1 (__main__.TestAStar) ... ok
testAStar2 (__main__.TestAStar) ... ok
testAStar3 (__main__.TestAStar) ... ok
testAStar4 (__main__.TestAStar) ... ok
testAStar5 (__main__.TestAStar) ... ok

----------------------------------------------------------------------
Ran 5 tests in 0.010s

OK
