Nested Hierarchy Workflow
Tasks flow from company to departments and teams, illustrating resource constraints across two levels.
Level:Beginner
Emergence
Discover how interactions between parts can create properties and behaviors that no individual component possesses alone.
Explore Emergencesimulation.py
Work flowing through a nested organisation
Picture a company with several departments, each hosting a number of teams. New tasks choose a department and then a team at random so we can watch how the workload spreads across the hierarchy.
from tys import probe, progress
Simulate work distribution across departments and teams.
def simulate(cfg: dict):
import random
import simpy
env = simpy.Environment()
random.seed(cfg.get("seed", 42))
num_departments = cfg["num_departments"]
teams_per_department = cfg["teams_per_department"]
tasks_total = cfg["tasks_total"]
arrival_interval = cfg["arrival_interval"]
service_time = cfg["service_time"]
processed = 0
class Team:
def __init__(self, name: str):
self.name = name
self.resource = simpy.Resource(env, capacity=1)
self.done = 0
class Department:
def __init__(self, name: str):
self.name = name
self.teams = [Team(f"{name}-Team{i}") for i in range(teams_per_department)]
self.done = 0
departments = [Department(f"Dept{i}") for i in range(num_departments)]
done = env.event()
Process a single task using a random team.
def task_proc(name: str):
nonlocal processed
dept = random.choice(departments)
team = random.choice(dept.teams)
with team.resource.request() as req:
yield req
duration = random.expovariate(1.0 / service_time)
yield env.timeout(duration)
team.done += 1
dept.done += 1
processed += 1
probe(team.name, env.now, team.done)
probe(dept.name, env.now, dept.done)
progress(int(100 * processed / tasks_total))
if processed >= tasks_total and not done.triggered:
done.succeed({"tasks_processed": processed})
Generate tasks over time.
def generator():
for i in range(tasks_total):
env.process(task_proc(f"Task{i}"))
yield env.timeout(random.expovariate(1.0 / arrival_interval))
env.process(generator())
env.run(until=done)
return done.value
def requirements():
return {
"builtin": ["micropip", "pyyaml"],
"external": ["simpy==4.1.1"],
}
config.yaml
num_departments: 2
teams_per_department: 2
tasks_total: 20
arrival_interval: 1.0
service_time: 2.0
seed: 42