Nested Hierarchy Workflow

A nested hierarchy simulation where tasks flow across company departments.

Level:Beginner

workflowcapacityresource-managementthroughputbottleneck

  • Flows:arrivals, service
  • Probes:Dept0, Dept1, Dept0-Team0, Dept0-Team1, Dept1-Team0, Dept1-Team1

Emergence

Discover how interactions between parts can create properties and behaviors that no individual component possesses alone.

Explore Emergence
simulation.py

Work flowing through a nested organisation

Picture a company with several departments, each hosting a number of teams. New tasks choose a department and then a team at random so we can watch how the workload spreads across the hierarchy.


from tys import probe, progress

Simulate work distribution across departments and teams.

def simulate(cfg: dict):
    import random
    import simpy

    env = simpy.Environment()

    random.seed(cfg.get("seed", 42))

    num_departments = cfg["num_departments"]
    teams_per_department = cfg["teams_per_department"]
    tasks_total = cfg["tasks_total"]
    arrival_interval = cfg["arrival_interval"]
    service_time = cfg["service_time"]

    processed = 0

    class Team:
        def __init__(self, name: str):
            self.name = name
            self.resource = simpy.Resource(env, capacity=1)
            self.done = 0

    class Department:
        def __init__(self, name: str):
            self.name = name
            self.teams = [Team(f"{name}-Team{i}") for i in range(teams_per_department)]
            self.done = 0

    departments = [Department(f"Dept{i}") for i in range(num_departments)]
    done = env.event()

Process a single task using a random team.

    def task_proc(name: str):
        nonlocal processed
        dept = random.choice(departments)
        team = random.choice(dept.teams)
        with team.resource.request() as req:
            yield req
            duration = random.expovariate(1.0 / service_time)
            yield env.timeout(duration)
        team.done += 1
        dept.done += 1
        processed += 1
        probe(team.name, env.now, team.done)
        probe(dept.name, env.now, dept.done)
        progress(int(100 * processed / tasks_total))
        if processed >= tasks_total and not done.triggered:
            done.succeed({"tasks_processed": processed})

Generate tasks over time.

    def generator():
        for i in range(tasks_total):
            env.process(task_proc(f"Task{i}"))
            yield env.timeout(random.expovariate(1.0 / arrival_interval))

    env.process(generator())
    env.run(until=done)
    return done.value


def requirements():
    return {
        "builtin": ["micropip", "pyyaml"],
        "external": ["simpy==4.1.1"],
    }
Default.yaml
num_departments: 2
teams_per_department: 2
tasks_total: 20
arrival_interval: 1.0
service_time: 2.0
seed: 42
Charts (Default)

Dept0-Team1

Dept0-Team1 chartCSV
Samples5 @ 0.56–28.74
Valuesmin 1.00, mean 3.00, median 3.00, max 5.00, σ 1.41

Dept0

Dept0 chartCSV
Samples13 @ 0.56–28.74
Valuesmin 1.00, mean 7.00, median 7.00, max 13.00, σ 3.74

Dept1-Team0

Dept1-Team0 chartCSV
Samples7 @ 2.66–24.66
Valuesmin 1.00, mean 4.00, median 4.00, max 7.00, σ 2.00

Dept1

Dept1 chartCSV
Samples7 @ 2.66–24.66
Valuesmin 1.00, mean 4.00, median 4.00, max 7.00, σ 2.00

Dept0-Team0

Dept0-Team0 chartCSV
Samples8 @ 2.81–24.62
Valuesmin 1.00, mean 4.50, median 4.50, max 8.00, σ 2.29
Final Results (Default)
MetricValue
tasks_processed20.00
FAQ
How is workload recorded across teams?
Each completed task increments probes for its team and department so the distribution is visible.
How is a task assigned to a team?
The task_proc function selects a random department then a random team within it for each new task.
When does the simulation finish?
Once processed tasks reach tasks_total the done event resolves and env.run stops.