๐Ÿ”ฌ Controlling angr dynamic symbolic execution resources consumption

Intro

You all know, DSE can be explosive, and sometimes you need either to put a hard limit on resources consumption (in fact, you don’t want to risk losing your perfect windows placement by rebooting!), or you can’t afford to wait forever for symbolic execution because YOLO.

On our angr slack channel, multiple users have asked how to set a Timeout to their symbolic exploration, so, I’ve thought was a good idea to share here some snippets that can be useful when playing around with angr.

MemLimiter โš–๏ธ

The following ExplorationTechnique can be plugged into an instance of a SimulationManager to stop DSE when memory consumption hits critical levels.

class MemLimiter(angr.exploration_techniques.ExplorationTechnique):
    def __init__(self, max_mem, drop_errored):
        super(MemLimiter, self).__init__()
        self.max_mem = max_mem
        self.drop_errored = drop_errored
        self.process = psutil.Process(os.getpid())

    def step(self, simgr, stash='active', **kwargs):
        if psutil.virtual_memory().percent > 90 or (self.max_mem - 1) < self.memory_usage_psutil:
            simgr.move(from_stash='active', to_stash='out_of_memory')
            simgr.move(from_stash='deferred', to_stash='out_of_memory')

        simgr.drop(stash='deadended')
        simgr.drop(stash='avoid')
        simgr.drop(stash='found')
        if self.drop_errored:
            del simgr.errored[:]

        return simgr.step(stash=stash)

    @property
    def memory_usage_psutil(self):
        # return the memory usage in MB
        mem = self.process.memory_info().vms / float(2 ** 30)
        return mem

ExplosionDetector ๐Ÿ’ฅ

When the ExplosionDetector is plugged in a SimulationManager, it can be used to (1) trigger a timeout, (2) stop the execution when reaching a certain amount of generated SimState(s), and (3) nuking all the unconstrained SimState(s) in a SimulationManager.

class ExplosionDetector(ExplorationTechnique):
    def __init__(self, stashes=('active', 'deferred', 'errored', 'cut'), threshold=100):
        super(ExplosionDetector, self).__init__()
        self._stashes = stashes
        self._threshold = threshold
        self.timed_out = Event()
        self.timed_out_bool = False

    def step(self, simgr, stash='active', **kwargs):
        simgr = simgr.step(stash=stash, **kwargs)
        total = 0
        if len(simgr.unconstrained) > 0:
            l.debug("Nuking unconstrained")
            simgr.move(from_stash='unconstrained', to_stash='_Drop', filter_func=lambda _: True)
        if self.timed_out.is_set():
            l.critical("Timed out, %d states: %s" % (total, str(simgr)))
            self.timed_out_bool = True
            for st in self._stashes:
                if hasattr(simgr, st):
                    simgr.move(from_stash=st, to_stash='_Drop', filter_func=lambda _: True)
        for st in self._stashes:
            if hasattr(simgr, st):
                total += len(getattr(simgr, st))

        if total >= self._threshold:
            l.critical("State explosion detected, over %d states: %s" % (total, str(simgr)))
            for st in self._stashes:
                if hasattr(simgr, st):
                    simgr.move(from_stash=st, to_stash='_Drop', filter_func=lambda _: True)

        return simgr

In particular, I’ve been using the following snippet of code to set a timeout to my symbolic explorations by leveraging the field self.timed_out_bool inside the ExplosionDetector:


from threading import Event, Timer

'''
project: an angr Project object
state: state that will be used to start dse
'''
def dse_it(project, state):

    sm = project.factory.simgr(state)

    dfs = angr.exploration_techniques.DFS()
    sm.use_technique(dfs)
    ed = ExplosionDetector(threshold=1000)
    sm.use_technique(ed)

    # Callback for timeout that sets the boolean inside the 
    # ExplosionDetector
    def timeout():
        l.warning("Timeout during DSE has been reached.")
        ed.timed_out.set()
        ed.timed_out_bool = True

    # The actual timer
    timer = Timer(300, timeout)
    timer.start()

    while len(sm.active) > 0 and not ed.timed_out_bool:
        new_state = sm.active[0]
        state_addr = new_state.solver.eval(new_state.regs.pc)
        sm.step()

    # Cancel the timer
    timer.cancel()
    
    # Check ExplosionDetector and do something if we timed out.
    if ed.timed_out_bool:
        l.fatal("DSE timed out")

This might not be the cleanest way to do it, but, it does the job!