Nested Hierarchy Workflow

Tasks flow from company to departments and teams, illustrating resource constraints across two levels.

Level:Beginner

workflowhierarchymultilevelcapacitytasks

  • Flows:arrivals, service
  • Probes:Dept0, Dept1, Dept0-Team0, Dept0-Team1, Dept1-Team0, Dept1-Team1

Emergence

Discover how interactions between parts can create properties and behaviors that no individual component possesses alone.

Explore Emergence
simulation.py

Work flowing through a nested organisation

Picture a company with several departments, each hosting a number of teams. New tasks choose a department and then a team at random so we can watch how the workload spreads across the hierarchy.


from tys import probe, progress

Simulate work distribution across departments and teams.

def simulate(cfg: dict):
    import random
    import simpy

    env = simpy.Environment()

    random.seed(cfg.get("seed", 42))

    num_departments = cfg["num_departments"]
    teams_per_department = cfg["teams_per_department"]
    tasks_total = cfg["tasks_total"]
    arrival_interval = cfg["arrival_interval"]
    service_time = cfg["service_time"]

    processed = 0

    class Team:
        def __init__(self, name: str):
            self.name = name
            self.resource = simpy.Resource(env, capacity=1)
            self.done = 0

    class Department:
        def __init__(self, name: str):
            self.name = name
            self.teams = [Team(f"{name}-Team{i}") for i in range(teams_per_department)]
            self.done = 0

    departments = [Department(f"Dept{i}") for i in range(num_departments)]
    done = env.event()

Process a single task using a random team.

    def task_proc(name: str):
        nonlocal processed
        dept = random.choice(departments)
        team = random.choice(dept.teams)
        with team.resource.request() as req:
            yield req
            duration = random.expovariate(1.0 / service_time)
            yield env.timeout(duration)
        team.done += 1
        dept.done += 1
        processed += 1
        probe(team.name, env.now, team.done)
        probe(dept.name, env.now, dept.done)
        progress(int(100 * processed / tasks_total))
        if processed >= tasks_total and not done.triggered:
            done.succeed({"tasks_processed": processed})

Generate tasks over time.

    def generator():
        for i in range(tasks_total):
            env.process(task_proc(f"Task{i}"))
            yield env.timeout(random.expovariate(1.0 / arrival_interval))

    env.process(generator())
    env.run(until=done)
    return done.value


def requirements():
    return {
        "builtin": ["micropip", "pyyaml"],
        "external": ["simpy==4.1.1"],
    }
config.yaml
num_departments: 2
teams_per_department: 2
tasks_total: 20
arrival_interval: 1.0
service_time: 2.0
seed: 42