# Fishery Simulation (Beginner) A fishery simulation of stocks, flows, and feedback loops managing fish populations. ID: fishery Tags: population, resource-management, sustainability, management, ecosystem, stocks-flows, reinforcing-loop, balancing-loop, renewable-resource, quota-policy Stocks: population Flows: births, quota Feedback Loops: reproduction (reinforcing), quota (balancing) Probes: population, quota, gap_to_capacity, extracted_total Context: - Example page: /examples/fishery - Playground: /playground?example=fishery - Collection page: /example/collections/population-dynamics - Tag pages: /example/tags/population, /example/tags/resource-management, /example/tags/sustainability, /example/tags/management, /example/tags/ecosystem, /example/tags/stocks-flows, /example/tags/reinforcing-loop, /example/tags/balancing-loop, /example/tags/renewable-resource, /example/tags/quota-policy ## Parameters ### Collapse (default) ```yaml initial_pop: 30 carrying_capacity: 100 growth_rate: 0.4 quota_fraction: 0.22 perception_delay: 7 sim_time: 300 ``` ### Stable ```yaml initial_pop: 30 carrying_capacity: 100 growth_rate: 0.4 quota_fraction: 0.21 perception_delay: 7 sim_time: 300 ``` ## Simulation Code ```python from tys import probe, progress def simulate(cfg: dict): import simpy env = simpy.Environment() population = cfg["initial_pop"] # starting fish in the lake capacity = cfg["carrying_capacity"] # ecological limit of the lake growth_rate = cfg["growth_rate"] # how quickly fish reproduce quota_fraction = cfg["quota_fraction"] # fraction of perceived stock harvested perception_delay = cfg["perception_delay"] # steps our information is behind sim_time = cfg["sim_time"] # length of the simulation history = [population] * perception_delay # ring buffer of past counts extracted_total = 0.0 # running total of fish taken done = env.event() def dynamics(): nonlocal population, extracted_total for t in range(sim_time): perceived = history[0] # what the managers believe quota = quota_fraction * perceived birth = growth_rate * population * (1 - population / capacity) population = max(population + birth - quota, 0) # update stock safely extracted_total += quota history.append(population) history.pop(0) # advance the perception window yield env.timeout(1) progress(100) done.succeed({ "final_population": population, "utilisation": population / capacity, "survived": population > 0.05 * capacity, "extracted_total": extracted_total }) env.process(dynamics()) def recorder(): while True: perceived = history[0] quota = quota_fraction * perceived probe("population", env.now, population) probe("quota", env.now, quota) probe("gap_to_capacity", env.now, capacity - population) probe("extracted_total", env.now, extracted_total) yield env.timeout(0.5) # half-step for smoother lines env.process(recorder()) env.run(until=done) return done.value def requirements(): return { "external": ["simpy==4.1.1"], } ``` ## FAQ Q: Why can quotas overshoot the population? A: Quota decisions look at a delayed perception of stock stored in a ring buffer, so real fish numbers may drop before managers react. Q: How is population growth calculated each step? A: Births follow logistic growth: growth_rate * population * (1 - population/carrying_capacity). Q: Which probe shows risk of collapse? A: gap_to_capacity tracks how far the population is from the lake's carrying capacity. --- # Antifragile System (Beginner) A simple antifragile system simulation where each failure reduces the probability of future failures. ID: antifragile-system Tags: reinforcing-loop, reliability Flows: failure Feedback Loops: learning from failure Probes: failure, failure_rate, total_failures Context: - Example page: /examples/antifragile-system - Playground: /playground?example=antifragile-system - Collection page: /example/collections/reliability-maintenance - Tag pages: /example/tags/reinforcing-loop, /example/tags/reliability ## Parameters ### Default (default) ```yaml failure_rate: 0.3 learning_rate: 0.1 iterations: 50 ``` ## Simulation Code ```python from tys import probe, progress def simulate(cfg: dict): import simpy import random env = simpy.Environment() failure_rate = cfg["failure_rate"] # probability of failure each step learning_rate = cfg["learning_rate"] # failure rate reduction after failure iterations = cfg["iterations"] done = env.event() def run(): nonlocal failure_rate failures = 0 for t in range(iterations): if random.random() < failure_rate: failures += 1 failure_rate *= 1 - learning_rate probe("failure", env.now, 1) else: probe("failure", env.now, 0) probe("failure_rate", env.now, failure_rate) probe("total_failures", env.now, failures) progress(int(100 * (t + 1) / iterations)) yield env.timeout(1) done.succeed({"failures": failures, "final_failure_rate": failure_rate}) env.process(run()) env.run(until=done) return done.value def requirements(): return { "external": ["simpy==4.1.1"], } ``` ## FAQ Q: How does failure reduce future failure probability? A: Each time a failure occurs the failure_rate is multiplied by (1 - learning_rate), lowering the chance of failing again. Q: Where is randomness used in this simulation? A: Each iteration compares random.random() to failure_rate to decide if a failure happens. Q: When does the run event resolve? A: After iterations steps complete the event succeeds with the failure count and final rate. --- # First-Price vs. Vickrey Auctions (Intermediate) Repeated sealed-bid auctions comparing first-price and Vickrey rules with learning agents. ID: auction-first-vickrey Tags: game-theory, auction, learning, incentive, simpy Probes: revenue, allocative_efficiency, truthful_bidding Context: - Example page: /examples/auction-first-vickrey - Playground: /playground?example=auction-first-vickrey - Collection page: /example/collections/game-dynamics - Tag pages: /example/tags/game-theory, /example/tags/auction, /example/tags/learning, /example/tags/incentive, /example/tags/simpy ## Parameters ### First Price (default) ```yaml num_agents: 4 num_rounds: 100 auction_rule: first_price seed: 42 ``` ### Vickrey ```yaml num_agents: 4 num_rounds: 100 auction_rule: vickrey seed: 42 ``` ## Simulation Code ```python from tys import probe, progress class Agent: """Bidder with regret-matching over a few shading options.""" def __init__(self, idx: int, rng): self.idx = idx self.rng = rng self.actions = [1.0, 0.8, 0.6] self.regret = [0.0] * len(self.actions) def _strategy(self): """Convert positive regrets into a probability distribution.""" positives = [max(r, 0.0) for r in self.regret] total = sum(positives) if total > 0: return [r / total for r in positives] return [1.0 / len(self.actions)] * len(self.actions) def choose_action(self) -> int: probs = self._strategy() r = self.rng.random() cumulative = 0.0 for i, p in enumerate(probs): cumulative += p if r <= cumulative: return i return len(self.actions) - 1 def update(self, value: float, bids: list[float], winner: int, payment: float, rule: str, chosen: int): chosen_payoff = value - payment if winner == self.idx else 0.0 for i, a in enumerate(self.actions): alt_bids = bids[:] alt_bids[self.idx] = value * a max_bid = max(alt_bids) winners = [j for j, b in enumerate(alt_bids) if b == max_bid] w = self.rng.choice(winners) if rule == "first_price": pay = alt_bids[w] else: sorted_bids = sorted(alt_bids, reverse=True) pay = sorted_bids[1] if len(sorted_bids) > 1 else 0.0 payoff = value - pay if w == self.idx else 0.0 regret = payoff - chosen_payoff if regret > 0: self.regret[i] += regret def simulate(cfg: dict): """Run repeated auctions and record summary metrics.""" import simpy import random env = simpy.Environment() num_agents = int(cfg["num_agents"]) num_rounds = int(cfg["num_rounds"]) rule = cfg["auction_rule"] # ``first_price`` or ``vickrey`` seed = int(cfg.get("seed", 1)) rng = random.Random(seed) agents = [Agent(i, rng) for i in range(num_agents)] revenue_total = 0.0 efficient_total = 0 done = env.event() def run(): nonlocal revenue_total, efficient_total for t in range(num_rounds): vals = [rng.random() for _ in range(num_agents)] bids = [] choices = [] for ag, val in zip(agents, vals): idx = ag.choose_action() choices.append(idx) bids.append(val * ag.actions[idx]) max_bid = max(bids) winners = [i for i, b in enumerate(bids) if b == max_bid] winner = rng.choice(winners) if rule == "first_price": payment = bids[winner] else: # Vickrey second-price sorted_bids = sorted(bids, reverse=True) payment = sorted_bids[1] if len(sorted_bids) > 1 else 0.0 revenue_total += payment if vals[winner] == max(vals): efficient_total += 1 truthful = sum(1 for idx in choices if agents[0].actions[idx] == 1.0) / num_agents probe("revenue", env.now, payment) probe("allocative_efficiency", env.now, efficient_total / (t + 1)) probe("truthful_bidding", env.now, truthful) for i, ag in enumerate(agents): ag.update(vals[i], bids, winner, payment, rule, choices[i]) progress(int(100 * (t + 1) / num_rounds)) yield env.timeout(1) done.succeed({ "avg_revenue": revenue_total / num_rounds, "efficiency": efficient_total / num_rounds, }) env.process(run()) env.run(until=done) return done.value def requirements(): return { "external": ["simpy==4.1.1"], } ``` ## FAQ Q: How do bidders adapt their bids over time? A: Each agent updates regrets for different bid multipliers after every round, making higher-regret strategies more likely next time. Q: What determines the auction winner? A: The highest bid wins; if several bids tie the simulation selects a random winner among them. Q: How is payment computed in the Vickrey version? A: The winner pays the second-highest bid, so bidding truthfully is a dominant strategy. --- # Bank Renege Simulation (Intermediate) A bank renege simulation where customers join a queue with limited patience and may leave before service if the wait is too long. ID: bank-renege Tags: queue, service, discrete-event, time-delay Stocks: queue Flows: served, reneged Probes: wait_time, served, reneged Context: - Example page: /examples/bank-renege - Playground: /playground?example=bank-renege - Collection page: /example/collections/scheduling-priority-queues - Tag pages: /example/tags/queue, /example/tags/service, /example/tags/discrete-event, /example/tags/time-delay ## Parameters ### Default (default) ```yaml seed: 42 num_customers: 50 arrival_interval: 5.0 time_in_bank: 12.0 min_patience: 1 max_patience: 3 ``` ## Simulation Code ```python import random from tys import probe, progress def simulate(cfg: dict): import simpy env = simpy.Environment() num_customers = cfg["num_customers"] arrival_interval = cfg["arrival_interval"] time_in_bank = cfg["time_in_bank"] min_patience = cfg["min_patience"] max_patience = cfg["max_patience"] random_seed = cfg.get("seed", 42) random.seed(random_seed) counter = simpy.Resource(env, capacity=1) served = 0 reneged = 0 done = env.event() def customer(name: str): nonlocal served, reneged arrive = env.now with counter.request() as req: patience = random.uniform(min_patience, max_patience) results = yield req | env.timeout(patience) wait = env.now - arrive probe("wait_time", env.now, wait) if req in results: service_time = random.expovariate(1.0 / time_in_bank) yield env.timeout(service_time) served += 1 probe("served", env.now, served) else: reneged += 1 probe("reneged", env.now, reneged) processed = served + reneged progress(100 * processed / num_customers) if processed >= num_customers: done.succeed({"served": served, "reneged": reneged}) def source(): for i in range(num_customers): env.process(customer(f"Customer{i:02d}")) t = random.expovariate(1.0 / arrival_interval) yield env.timeout(t) env.process(source()) env.run(until=done) return done.value def requirements(): return { "external": ["simpy==4.1.1"], } ``` ## FAQ Q: When do customers leave the queue? A: Each customer waits with a patience timeout; if it expires before they get service, the process records a reneged event. Q: How are arrival times generated? A: The source process draws delays from an exponential distribution based on arrival_interval. Q: What distribution controls service duration? A: Service times are random.expovariate with mean time_in_bank so some customers take longer. --- # Bathtub Fill and Drain (Beginner) A bathtub simulation illustrating stock and flow of water volume. ID: bathtub Tags: stocks-flows, capacity Stocks: water_volume Flows: inflow, outflow Probes: water_volume, inflow, outflow Context: - Example page: /examples/bathtub - Playground: /playground?example=bathtub - Tag pages: /example/tags/stocks-flows, /example/tags/capacity ## Parameters ### Default (default) ```yaml initial_volume: 0 inflow_rate: 8 drain_rate: 3 minutes: 20 ``` ## Simulation Code ```python from tys import probe, progress def simulate(cfg: dict): import simpy env = simpy.Environment() volume = cfg["initial_volume"] # water in tub (liters) inflow = cfg["inflow_rate"] # liters per minute entering drain = cfg["drain_rate"] # liters per minute leaving minutes = cfg["minutes"] done = env.event() def run(): nonlocal volume for m in range(minutes): volume += inflow - drain volume = max(0, volume) probe("water_volume", env.now, volume) probe("inflow", env.now, inflow) probe("outflow", env.now, drain) progress(int(100 * (m + 1) / minutes)) yield env.timeout(1) done.succeed({"final_volume": volume}) env.process(run()) env.run(until=done) return done.value def requirements(): return { "external": ["simpy==4.1.1"], } ``` ## FAQ Q: How is the water volume updated each minute? A: Volume increases by inflow and decreases by the drain rate while never dropping below zero. Q: What prevents negative volume when the drain exceeds inflow? A: The simulation clamps volume to at least zero after each update. Q: Where do the probe values come from? A: The run process emits water_volume, inflow and outflow every minute so the charts show how they change. --- # Car Service Simulation - SimPy Tutorial (Advanced) A car service simulation showing priority washing with features like interrupts. ID: car-service Tags: simpy, priority, service, maintenance, queue, interrupts, discrete-event, reliability, workflow Stocks: fuel_level, parts Probes: fuel_level, wash_q, parts Context: - Example page: /examples/car-service - Playground: /playground?example=car-service - Collection page: /example/collections/reliability-maintenance - Tag pages: /example/tags/simpy, /example/tags/priority, /example/tags/service, /example/tags/maintenance, /example/tags/queue, /example/tags/interrupts, /example/tags/discrete-event, /example/tags/reliability, /example/tags/workflow ## Parameters ### Default (default) ```yaml seed: 42 num_pumps: 2 wash_bays: 1 fuel_capacity: 1000 fuel_init: 800 fuel_range: - 20 - 60 fuel_time: 0.1 wash_time: 60 drive_time: 180 breakdown_after: - 60 - 120 repair_time: 90 arrival_interval: 45 restock_interval: 30 tanker_time: 120 audit_interval: 300 audit_time: 15 parts_capacity: 30 initial_parts: 15 part_types: - brake - filter - wiper vip_frequency: 5 num_cars: 25 ``` ## Simulation Code ```python import random from typing import Any, Callable, Dict, List from tys import probe, progress def simulate(cfg: Dict[str, Any]) -> Dict[str, int]: import simpy random.seed(cfg.get("seed", 42)) env = simpy.Environment() pump = simpy.Resource(env, capacity=cfg.get("num_pumps", 2)) wash = simpy.PriorityResource(env, capacity=cfg.get("wash_bays", 1)) fuel_capacity = cfg.get("fuel_capacity", 1000) fuel_tank = simpy.Container(env, init=cfg.get("fuel_init", 800), capacity=fuel_capacity) parts_store: simpy.FilterStore[str] = simpy.FilterStore( env, capacity=cfg.get("parts_capacity", 30) ) part_types: List[str] = cfg.get("part_types", ["brake", "filter", "wiper"]) for _ in range(cfg.get("initial_parts", 15)): parts_store.put(random.choice(part_types)) mechanic = simpy.PreemptiveResource(env, capacity=1) tanker_en_route = False cars_served = 0 sim_done = env.event() def record(name: str, getter: Callable[[], Any], interval: float = 0.5): while True: probe(name, env.now, getter()) yield env.timeout(interval) def refuel_car(): with pump.request() as req: yield req fuel_needed = random.randint(*cfg.get("fuel_range", (20, 60))) yield fuel_tank.get(fuel_needed) yield env.timeout(fuel_needed * cfg.get("fuel_time", 0.1)) def wash_car(priority: int): with wash.request(priority=priority) as req: yield req yield env.timeout(cfg.get("wash_time", 60)) def fetch_spare_part(): needed_part = random.choice(part_types) try: yield parts_store.get(lambda p: p == needed_part) except simpy.exceptions.FilterError: pass def repair_car(): with mechanic.request(priority=1) as req: yield req try: yield env.timeout(cfg.get("repair_time", 90)) except simpy.Interrupt: yield env.timeout(cfg.get("audit_time", 15)) def drive_round_trip(): breakdown = env.timeout(random.randint(*cfg.get("breakdown_after", (60, 120)))) drive_done = env.timeout(cfg.get("drive_time", 180)) outcome = yield env.any_of([breakdown, drive_done]) if breakdown in outcome: yield from repair_car() yield drive_done def car(idx: int): nonlocal cars_served vip = (idx % cfg.get("vip_frequency", 5) == 0) priority = 0 if vip else 1 yield from refuel_car() yield from wash_car(priority) yield from fetch_spare_part() yield from drive_round_trip() cars_served += 1 progress(int(100 * cars_served / cfg["num_cars"])) if cars_served >= cfg["num_cars"] and not sim_done.triggered: sim_done.succeed({"cars_serviced": cars_served}) def generator(): nonlocal tanker_en_route for i in range(cfg["num_cars"]): if fuel_tank.level < 0.10 * fuel_capacity and not tanker_en_route: cond = env.condition(lambda: fuel_tank.level >= 0.10 * fuel_capacity or tanker_en_route) yield cond env.process(car(i)) inter = random.expovariate(1.0 / cfg.get("arrival_interval", 45)) yield env.timeout(inter) def restock(): nonlocal tanker_en_route while True: if fuel_tank.level < 0.50 * fuel_capacity and not tanker_en_route: tanker_en_route = True yield env.timeout(cfg.get("tanker_time", 120)) yield fuel_tank.put(fuel_capacity - fuel_tank.level) tanker_en_route = False if len(parts_store.items) < cfg["parts_capacity"] / 2: for _ in range(cfg["parts_capacity"] - len(parts_store.items)): parts_store.put(random.choice(part_types)) yield env.timeout(cfg.get("restock_interval", 30)) def authority(): while True: yield env.timeout(random.expovariate(1.0 / cfg.get("audit_interval", 300))) for req in list(mechanic.users): req.proc.interrupt("audit") env.process(generator()) env.process(restock()) env.process(authority()) env.process(record("fuel_level", lambda: fuel_tank.level)) env.process(record("wash_q", lambda: len(wash.queue))) env.process(record("parts", lambda: len(parts_store.items))) env.run(until=sim_done) return sim_done.value def requirements() -> Dict[str, List[str]]: return { "external": ["simpy==4.1.1"], } ``` ## FAQ Q: What triggers a fuel tanker delivery? A: If the fuel tank level falls below 10% capacity and no tanker is en route, the generator waits for a tanker process to refill the tank. Q: How are VIP cars prioritised for washing? A: Every nth car defined by vip_frequency gets priority=0 in the wash queue so it jumps ahead of regular customers. Q: What happens during a mechanic audit? A: The authority process interrupts the mechanic's current task using PreemptiveResource so an audit delay is inserted. --- # Dynamic Difficulty Adjustment (Intermediate) Adaptive encounters that track win rate to keep players challenged. ID: dynamic-difficulty Tags: gameplay, reinforcing-loop, balancing-loop, adaptive, game Stocks: skill, difficulty, retention Flows: practice gains, difficulty adaptation Feedback Loops: skill improvement (reinforcing), challenge tuning (balancing) Probes: skill, difficulty, win_rate, frustration, retention Context: - Example page: /examples/dynamic-difficulty - Playground: /playground?example=dynamic-difficulty - Collection page: /example/collections/game-dynamics - Tag pages: /example/tags/gameplay, /example/tags/reinforcing-loop, /example/tags/balancing-loop, /example/tags/adaptive, /example/tags/game ## Parameters ### Default (default) ```yaml initial_skill: 5 skill_gain: 0.5 initial_difficulty: 5 adjust_rate: 0.3 target_win_rate: 0.7 memory: 5 frustration_thresh: 1.0 encounters: 50 seed: 42 ``` ## Simulation Code ```python from tys import probe, progress def simulate(cfg: dict): import simpy import random env = simpy.Environment() skill = cfg["initial_skill"] # starting player skill level skill_gain = cfg["skill_gain"] # practice boost each round difficulty = cfg["initial_difficulty"] # challenge level of encounters adjust_rate = cfg["adjust_rate"] # how quickly difficulty changes target_win = cfg["target_win_rate"] # desired win rate to hit flow memory = cfg["memory"] # number of recent outcomes to track frustr_thresh = cfg["frustration_thresh"] # time threshold before frustration encounters = cfg["encounters"] # total encounters simulated random.seed(cfg.get("seed", 1)) # deterministic runs for comparison history = [] # recent win/loss record retention = 1.0 # probability the player keeps playing done = env.event() def play(): nonlocal skill, difficulty, retention for n in range(encounters): p_win = skill / (skill + difficulty) win = random.random() < p_win time_to_victory = difficulty / skill frustration = max(0, time_to_victory - frustr_thresh) if not win: frustration += 1 retention *= max(0.0, 1 - 0.1 * frustration) history.append(1 if win else 0) if len(history) > memory: history.pop(0) win_rate = sum(history) / len(history) difficulty *= 1 + adjust_rate * (win_rate - target_win) skill += skill_gain probe("skill", env.now, skill) probe("difficulty", env.now, difficulty) probe("win_rate", env.now, win_rate) probe("frustration", env.now, frustration) probe("retention", env.now, retention) progress(int(100 * (n + 1) / encounters)) yield env.timeout(1) done.succeed({ "final_skill": skill, "final_difficulty": difficulty, "retention": retention }) env.process(play()) env.run(until=done) return done.value def requirements(): return { "external": ["simpy==4.1.1"], } ``` ## FAQ Q: How does the game adjust difficulty? A: After each encounter the difficulty is scaled by 1 + adjust_rate * (win_rate - target_win) based on recent results. Q: What factors contribute to player frustration? A: Long battle times and losses increase the frustration metric which reduces retention. Q: How is retention calculated? A: Retention is multiplied by max(0, 1 - 0.1 * frustration) each round to reflect players quitting over time. --- # Gacha Box (Beginner) Simulates drawing items from a gacha box with configurable drop rates and a pity system. ID: gacha-box Tags: stochastic, game, random Probes: count_common, count_rare, count_ultra, spent Context: - Example page: /examples/gacha-box - Playground: /playground?example=gacha-box - Tag pages: /example/tags/stochastic, /example/tags/game, /example/tags/random ## Parameters ### Default (default) ```yaml num_draws: 50 cost_per_draw: 10 rarities: common: 0.8 rare: 0.19 ultra: 0.01 pity_after: 30 seed: 42 ``` ## Simulation Code ```python import random from tys import probe, progress def simulate(cfg: dict): import simpy env = simpy.Environment() num_draws = cfg["num_draws"] # total pulls to make cost_per_draw = cfg["cost_per_draw"] # currency spent each pull rarities = cfg["rarities"] # mapping of rarity -> probability pity_after = cfg.get("pity_after", 0) # guarantee top rarity after this many seed = cfg.get("seed", 12345) # RNG seed for reproducibility rng = random.Random(seed) names = list(rarities.keys()) weights = [rarities[n] for n in names] counts = {n: 0 for n in names} # cumulative draws per rarity spent = 0 # total currency spent draws_since_top = 0 # draws since last top rarity pull top_rarity = names[-1] # most coveted rarity in the pool done = env.event() def recorder(): while True: for r in names: probe(f"count_{r}", env.now, counts[r]) probe("spent", env.now, spent) yield env.timeout(1) env.process(recorder()) def dynamics(): nonlocal spent, draws_since_top for i in range(num_draws): if pity_after and draws_since_top >= pity_after: rarity = top_rarity draws_since_top = 0 else: rarity = rng.choices(names, weights=weights)[0] if rarity == top_rarity: draws_since_top = 0 else: draws_since_top += 1 counts[rarity] += 1 spent += cost_per_draw progress(int(100 * (i + 1) / num_draws)) yield env.timeout(1) done.succeed({"spent": spent} | {f"count_{k}": v for k, v in counts.items()}) env.process(dynamics()) env.run(until=done) return done.value def requirements(): return { "external": ["simpy==4.1.1"], } ``` ## FAQ Q: How does the pity mechanic work? A: If the count of draws since the top rarity meets or exceeds pity_after, the next draw automatically yields the top rarity. Q: How are rarities chosen on a normal draw? A: random.choices selects a rarity using the probability weights configured for each tier. Q: What metrics does the recorder process track? A: It probes count_ for each rarity as well as total spent after every draw. --- # Game of Life (Beginner) Conway's Game of Life producing grid frames. ID: game-of-life Tags: cellular-automata, grid Probes: alive Context: - Example page: /examples/game-of-life - Playground: /playground?example=game-of-life - Collection page: /example/collections/cellular-automata - Tag pages: /example/tags/cellular-automata, /example/tags/grid ## Parameters ### Default (default) ```yaml rows: 80 cols: 80 steps: 60 initial_chance: 0.3 ``` ## Simulation Code ```python from tys import progress, frame, probe import random def simulate(cfg: dict): rows = cfg["rows"] cols = cfg["cols"] steps = cfg["steps"] chance = cfg.get("initial_chance", 0.2) grid = [[1 if random.random() < chance else 0 for _ in range(cols)] for _ in range(rows)] probe("alive", 0, sum(sum(row) for row in grid)) def count_neighbors(r: int, c: int) -> int: total = 0 for dr in (-1, 0, 1): for dc in (-1, 0, 1): if dr == 0 and dc == 0: continue rr = (r + dr) % rows cc = (c + dc) % cols total += grid[rr][cc] return total for step in range(steps): frame(step, grid) new_grid = [[0] * cols for _ in range(rows)] for r in range(rows): for c in range(cols): n = count_neighbors(r, c) if grid[r][c]: new_grid[r][c] = 1 if n in (2, 3) else 0 else: new_grid[r][c] = 1 if n == 3 else 0 grid = new_grid alive = sum(sum(row) for row in grid) probe("alive", step + 1, alive) progress(int(100 * (step + 1) / steps)) frame(steps, grid) alive = sum(sum(row) for row in grid) return {"alive": alive} def requirements(): return { "external": [] } ``` ## FAQ Q: Why use wrapping indexes in count_neighbors? A: Neighbor positions are computed with modulo so the grid edges wrap around like a torus. Q: How is the next generation computed? A: Each step counts living neighbours and applies the standard rules to produce a new grid. Q: What does the frame function record? A: It snapshots the entire grid each step so the evolution can be replayed. --- # Goal-Seeking Decay (Beginner) A goal-seeking decay simulation modeling exponential approach toward a target. ID: goal-seeking-decay Tags: decay, goal, balancing-loop, exponential, control, first-order-dynamics, time-delay Stocks: level Flows: approach_rate Feedback Loops: goal-seeking (balancing) Probes: level, approach_rate Context: - Example page: /examples/goal-seeking-decay - Playground: /playground?example=goal-seeking-decay - Collection page: /example/collections/control-systems-delays - Tag pages: /example/tags/decay, /example/tags/goal, /example/tags/balancing-loop, /example/tags/exponential, /example/tags/control, /example/tags/first-order-dynamics, /example/tags/time-delay ## Parameters ### Default (default) ```yaml initial_level: 100 goal_level: 0 rate_constant: 0.1 steps: 50 ``` ## Simulation Code ```python from tys import probe, progress def simulate(cfg: dict): import simpy env = simpy.Environment() level = cfg["initial_level"] goal = cfg["goal_level"] k = cfg["rate_constant"] steps = cfg["steps"] done = env.event() def run(): nonlocal level for i in range(steps): change = k * (goal - level) level += change probe("level", env.now, level) probe("approach_rate", env.now, change) yield env.timeout(1) progress(100) done.succeed({"final_level": level}) env.process(run()) env.run(until=done) return done.value def requirements(): return { "external": ["simpy==4.1.1"], } ``` ## FAQ Q: Why does progress slow near the goal? A: Because each step changes the level by k times the remaining gap, so as the gap shrinks the increments shrink too. Q: What does the approach_rate probe show? A: It records the size of each incremental change toward the goal at every step. Q: Is SimPy required here? A: The model uses SimPy purely for consistent timing and progress reporting. --- # Hospital ER Patient Flow (Intermediate) Queuing theory demo of patient flow through an emergency department. ID: hospital-er Tags: queue, healthcare, resource-management, prioritization Stocks: beds Flows: arrivals, departures Feedback Loops: staffing effects Probes: wait_low, wait_med, wait_high, bed_util, lwbs, served Context: - Example page: /examples/hospital-er - Playground: /playground?example=hospital-er - Collection page: /example/collections/scheduling-priority-queues - Tag pages: /example/tags/queue, /example/tags/healthcare, /example/tags/resource-management, /example/tags/prioritization ## Parameters ### Baseline (default) ```yaml arrival_rate: 0.1 # patients per minute (~6 per hour) sim_time: 480 # minutes (8 hours) beds: 5 nurses: 2 doctors: 1 imaging_capacity: 1 or_capacity: 1 lwbs_threshold: 60 severity_probs: - 0.5 # low - 0.3 # medium - 0.2 # high nurse_time_mean: 10 doctor_time_mean: 15 imaging_time: 30 or_time: 90 post_imaging_bed_time: 20 post_or_bed_time: 60 escalate_to_imaging: 0.2 escalate_to_or: 0.05 seed: 42 ``` ### With Extra Nurse ```yaml arrival_rate: 0.1 sim_time: 480 beds: 5 nurses: 3 # one additional nurse doctors: 1 imaging_capacity: 1 or_capacity: 1 lwbs_threshold: 60 severity_probs: - 0.5 - 0.3 - 0.2 nurse_time_mean: 10 doctor_time_mean: 15 imaging_time: 30 or_time: 90 post_imaging_bed_time: 20 post_or_bed_time: 60 escalate_to_imaging: 0.2 escalate_to_or: 0.05 seed: 42 ``` ## Simulation Code ```python import random from tys import probe, progress def simulate(cfg: dict): import simpy env = simpy.Environment() random.seed(cfg.get("seed", 42)) arrival_rate = cfg["arrival_rate"] # mean arrivals per minute sim_time = cfg["sim_time"] beds = simpy.Resource(env, capacity=cfg["beds"]) nurses = simpy.Resource(env, capacity=cfg["nurses"]) doctors = simpy.Resource(env, capacity=cfg["doctors"]) imaging = simpy.Resource(env, capacity=cfg.get("imaging_capacity", 1)) operating = simpy.Resource(env, capacity=cfg.get("or_capacity", 1)) severities = [1, 2, 3] severity_probs = cfg["severity_probs"] lwbs_threshold = cfg["lwbs_threshold"] # leave without being seen wait limit nurse_time = cfg["nurse_time_mean"] doctor_time = cfg["doctor_time_mean"] imaging_time = cfg["imaging_time"] or_time = cfg["or_time"] escalate_img = cfg["escalate_to_imaging"] escalate_or = cfg["escalate_to_or"] post_img = cfg["post_imaging_bed_time"] post_or = cfg["post_or_bed_time"] total_patients = 0 served = 0 lwbs = 0 done = env.event() def utilisation(): while True: probe("bed_util", env.now, len(beds.users) / beds.capacity) progress(min(int(100 * env.now / sim_time), 100)) yield env.timeout(1) env.process(utilisation()) def patient(name: str): nonlocal served, lwbs arrival = env.now severity = random.choices(severities, weights=severity_probs)[0] with beds.request() as req: results = yield req | env.timeout(lwbs_threshold) if req not in results: lwbs += 1 probe("lwbs", env.now, lwbs) return wait = env.now - arrival label = {1: "wait_low", 2: "wait_med", 3: "wait_high"}[severity] probe(label, env.now, wait) with nurses.request() as nreq: yield nreq yield env.timeout(random.expovariate(1.0 / nurse_time)) with doctors.request() as dreq: yield dreq yield env.timeout(random.expovariate(1.0 / doctor_time)) if random.random() < escalate_img: with imaging.request() as img: yield img yield env.timeout(imaging_time) with beds.request() as req2: yield req2 yield env.timeout(post_img) if random.random() < escalate_or: with operating.request() as opr: yield opr yield env.timeout(or_time) with beds.request() as req3: yield req3 yield env.timeout(post_or) served += 1 probe("served", env.now, served) def arrivals(): nonlocal total_patients i = 0 while env.now < sim_time: env.process(patient(f"Patient{i}")) i += 1 total_patients += 1 delay = random.expovariate(arrival_rate) yield env.timeout(delay) done.succeed({"arrived": total_patients, "served": served, "lwbs": lwbs}) env.process(arrivals()) env.run(until=done) progress(100) return done.value def requirements(): return { "external": ["simpy==4.1.1"], } ``` ## FAQ Q: What makes patients leave without being seen? A: When requesting a bed, patients wait only lwbs_threshold time units; if the timeout wins they increment the LWBS count. Q: How are acuity levels assigned? A: Each patient randomly selects a severity from 1–3 using the configured probability weights. Q: Why use separate resources for nurses and doctors? A: SimPy Resources limit concurrent service, creating queues that capture real bottlenecks. --- # Inspection Game (Intermediate) A poacher vs. ranger game with limited patrols and adaptive poachers. ID: inspection-game Tags: game-theory, reinforcement, monitoring Probes: catch_rate, ranger_coverage, poacher_payoff Context: - Example page: /examples/inspection-game - Playground: /playground?example=inspection-game - Collection page: /example/collections/game-dynamics - Tag pages: /example/tags/game-theory, /example/tags/reinforcement, /example/tags/monitoring ## Parameters ### Default (default) ```yaml num_sites: 5 patrols: 2 num_poachers: 3 reward: 10 penalty: 50 learning_rate: 0.2 epsilon: 0.1 sim_time: 100 ``` ## Simulation Code ```python import random from tys import probe, progress def simulate(cfg: dict): """Simulate repeated patrols and adaptive poachers.""" import simpy env = simpy.Environment() n_sites = cfg["num_sites"] # total locations that could be patrolled patrols = cfg["patrols"] # how many sites the ranger covers each day num_poachers = cfg["num_poachers"] reward = cfg["reward"] # gain for an uncaught poacher penalty = cfg["penalty"] # cost if caught red-handed lr = cfg.get("learning_rate", 0.1) # update weight for reinforcement eps = cfg.get("epsilon", 0.1) # exploration probability sim_time = cfg["sim_time"] rng = random.Random(cfg.get("seed", 123)) q_values = [[0.0 for _ in range(n_sites)] for _ in range(num_poachers)] catch_count = 0 attempts = 0 total_payoff = 0.0 done = env.event() def day(): nonlocal catch_count, attempts, total_payoff for t in range(sim_time): patrol_sites = rng.sample(range(n_sites), k=patrols) probe("ranger_coverage", env.now, patrols / n_sites) for p in range(num_poachers): q = q_values[p] if rng.random() < eps: site = rng.randrange(n_sites) else: best = max(q) best_sites = [i for i, v in enumerate(q) if v == best] site = rng.choice(best_sites) attempts += 1 if site in patrol_sites: catch_count += 1 payoff = -penalty else: payoff = reward q[site] = (1 - lr) * q[site] + lr * payoff total_payoff += payoff catch_rate = catch_count / attempts avg_payoff = total_payoff / attempts probe("catch_rate", env.now, catch_rate) probe("poacher_payoff", env.now, avg_payoff) progress(100 * (t + 1) / sim_time) yield env.timeout(1) done.succeed({"catch_rate": catch_rate, "avg_payoff": avg_payoff}) env.process(day()) env.run(until=done) return done.value def requirements(): return { "external": ["simpy==4.1.1"], } ``` ## FAQ Q: How do poachers learn which sites to pick? A: Each day they update a Q-value table using reinforcement: q[site]=(1-lr)*q[site]+lr*payoff. Q: How are patrol sites chosen? A: The ranger randomly samples patrols number of sites from the full set each day. Q: What do the probes measure? A: catch_rate records catches over attempts while poacher_payoff averages the reward or penalty. --- # Inventory Oscillation (Beginner) Bullwhip-style swings from naive reordering with shipping delay. ID: inventory-oscillation Tags: inventory, delay, bullwhip Stocks: inventory Flows: demand, shipments Feedback Loops: delayed reorder overshoot Probes: inventory, pipeline Context: - Example page: /examples/inventory-oscillation - Playground: /playground?example=inventory-oscillation - Tag pages: /example/tags/inventory, /example/tags/delay, /example/tags/bullwhip ## Parameters ### Default (default) ```yaml initial_inventory: 100 target_inventory: 100 daily_demand: 10 lead_time: 3 review_period: 1 sim_time: 60 ``` ## Simulation Code ```python from tys import probe, progress def simulate(cfg: dict): import simpy env = simpy.Environment() inventory = cfg["initial_inventory"] target = cfg["target_inventory"] daily_demand = cfg["daily_demand"] lead_time = cfg["lead_time"] review_period = cfg["review_period"] sim_time = cfg["sim_time"] pipeline = [] # list of (arrival_time, qty) done = env.event() def run(): nonlocal inventory, pipeline for day in range(sim_time): inventory = max(inventory - daily_demand, 0) arrivals = [qty for (t, qty) in pipeline if t == day] for qty in arrivals: inventory += qty pipeline = [(t, q) for (t, q) in pipeline if t != day] if day % review_period == 0: order_qty = max(target - inventory, 0) if order_qty > 0: pipeline.append((day + lead_time, order_qty)) probe("inventory", env.now, inventory) probe("pipeline", env.now, sum(q for _, q in pipeline)) progress(int(100 * (day + 1) / sim_time)) yield env.timeout(1) done.succeed({"final_inventory": inventory}) env.process(run()) env.run(until=done) return done.value def requirements(): return { "external": ["simpy==4.1.1"], } ``` --- # Logistic Growth (Beginner) A logistic growth simulation of population increase with saturation. ID: logistic-growth Tags: population, capacity, nonlinear-dynamics, reinforcing-loop Stocks: population Flows: growth Feedback Loops: reinforcing adoption, saturation constraint Probes: population, growth Context: - Example page: /examples/logistic-growth - Playground: /playground?example=logistic-growth - Collection page: /example/collections/population-dynamics - Tag pages: /example/tags/population, /example/tags/capacity, /example/tags/nonlinear-dynamics, /example/tags/reinforcing-loop ## Parameters ### Default (default) ```yaml initial_pop: 10 growth_rate: 0.2 carrying_capacity: 100 steps: 40 ``` ## Simulation Code ```python from tys import probe, progress def simulate(cfg: dict): import simpy env = simpy.Environment() population = cfg["initial_pop"] r = cfg["growth_rate"] K = cfg["carrying_capacity"] steps = cfg["steps"] done = env.event() def run(): nonlocal population for t in range(steps): growth = r * population * (1 - population / K) population += growth probe("population", env.now, population) probe("growth", env.now, growth) progress(int(100 * (t + 1) / steps)) yield env.timeout(1) done.succeed({"final_population": population}) env.process(run()) env.run(until=done) return done.value def requirements(): return { "external": ["simpy==4.1.1"], } ``` ## FAQ Q: When does growth slow down? A: The logistic equation multiplies r by population*(1 - population/K), so as population nears K the growth term declines. Q: How is the carrying capacity used? A: It sets the upper limit in the logistic term so population never exceeds K. Q: Which probes are charted? A: Both population and instantaneous growth are emitted each step for plotting. --- # Logistic Map (Advanced) A logistic map simulation that iterates x_{n+1}=r*x_n*(1-x_n) to illustrate chaos. ID: logistic-map Tags: population, nonlinear-dynamics Stocks: x Feedback Loops: growth with self-limiting term Probes: x Context: - Example page: /examples/logistic-map - Playground: /playground?example=logistic-map - Collection page: /example/collections/population-dynamics - Tag pages: /example/tags/population, /example/tags/nonlinear-dynamics ## Parameters ### Default (default) ```yaml initial_value: 0.4 growth_rate: 3.7 steps: 50 ``` ## Simulation Code ```python from tys import probe, progress def simulate(cfg: dict): import simpy env = simpy.Environment() r = cfg["growth_rate"] x = cfg["initial_value"] steps = cfg["steps"] done = env.event() def iterate(): nonlocal x for i in range(steps): x = r * x * (1 - x) probe("x", env.now, x) yield env.timeout(1) progress(100) done.succeed({"final_x": x}) env.process(iterate()) env.run(until=done) return done.value def requirements(): return { "external": ["simpy==4.1.1"], } ``` ## FAQ Q: How does the simulation produce chaotic behavior? A: Iterating x = r * x * (1 - x) for high r values like 3.5 leads to period doubling and chaos. Q: How is the map iterated in SimPy? A: The iterate process updates x and yields env.timeout(1) each step so time advances discretely. Q: What does the x probe capture? A: It records the value after each iteration to show bifurcations over time. --- # Predator-Prey (Lotka–Volterra, SciPy) (Intermediate) A predator-prey simulation using the Lotka-Volterra equations. ID: lotka-volterra Tags: population, ecosystem, nonlinear-dynamics Stocks: prey, predator Flows: prey_births, predations, predator_deaths Feedback Loops: predation cycle Probes: prey, predator Context: - Example page: /examples/lotka-volterra - Playground: /playground?example=lotka-volterra - Collection page: /example/collections/population-dynamics - Tag pages: /example/tags/population, /example/tags/ecosystem, /example/tags/nonlinear-dynamics ## Parameters ### Default (default) ```yaml initial_prey: 10 initial_predator: 5 alpha: 1.5 beta: 1.0 delta: 1.0 gamma: 3.0 sim_time: 20 time_step: 0.01 ``` ## Simulation Code ```python from tys import probe, progress def simulate(cfg: dict): import numpy as np from scipy.integrate import solve_ivp x0 = float(cfg["initial_prey"]) y0 = float(cfg["initial_predator"]) alpha = cfg["alpha"] beta = cfg["beta"] delta = cfg["delta"] gamma = cfg["gamma"] sim_time = float(cfg["sim_time"]) dt = float(cfg.get("time_step", 0.01)) assert all(p > 0 for p in (alpha, beta, delta, gamma)), "Rates must be positive" assert dt > 0 and sim_time > 0, "Time parameters must be positive" def lv(_, z): x, y = z return [ alpha * x - beta * x * y, # dx/dt delta * x * y - gamma * y # dy/dt ] n_steps = int(round(sim_time / dt)) t_eval = np.linspace(0.0, sim_time, n_steps + 1) sol = solve_ivp( lv, (0.0, sim_time), [x0, y0], t_eval=t_eval, method="RK45", max_step=dt ) prey_series = np.maximum(sol.y[0], 0.0) # clip tiny negatives predator_series = np.maximum(sol.y[1], 0.0) for t, x, y in zip(sol.t, prey_series, predator_series): probe("prey", t, x) probe("predator", t, y) progress(100) return { "final_prey": float(prey_series[-1]), "final_predator": float(predator_series[-1]), } def requirements(): return { "builtin": ["scipy"] } ``` ## FAQ Q: How are the predator-prey equations solved? A: SciPy's solve_ivp integrates the differential system using RK45 and returns values at each evaluation step. Q: Why clamp negative populations? A: The integrator can introduce tiny negatives, so numpy.maximum ensures prey and predator counts stay non-negative. Q: How are evaluation times chosen? A: t_eval is a linspace from 0 to sim_time with step dt so charts align with the solver output. --- # Predator-Prey (Lotka–Volterra, SimPy) (Intermediate) Discrete-time SimPy approximation of the predator–prey model. ID: lotka-volterra-simpy Tags: population, ecosystem, nonlinear-dynamics Stocks: prey, predator Flows: prey_births, predations, predator_deaths Feedback Loops: predation cycle Probes: prey, predator Context: - Example page: /examples/lotka-volterra-simpy - Playground: /playground?example=lotka-volterra-simpy - Collection page: /example/collections/population-dynamics - Tag pages: /example/tags/population, /example/tags/ecosystem, /example/tags/nonlinear-dynamics ## Parameters ### Default (default) ```yaml initial_prey: 10 initial_predator: 5 alpha: 1.5 beta: 1.0 delta: 1.0 gamma: 3.0 sim_time: 20 time_step: 0.01 ``` ## Simulation Code ```python from tys import probe, progress def simulate(cfg: dict): """Simulate predator and prey populations with SimPy.""" import simpy env = simpy.Environment() prey = float(cfg["initial_prey"]) predator = float(cfg["initial_predator"]) alpha = cfg["alpha"] beta = cfg["beta"] delta = cfg["delta"] gamma = cfg["gamma"] sim_time = float(cfg["sim_time"]) dt = float(cfg.get("time_step", 0.01)) assert all(p > 0 for p in (alpha, beta, delta, gamma)), "Rates must be positive" assert dt > 0 and sim_time > 0, "Time parameters must be positive" steps = int(sim_time / dt) done = env.event() def dynamics(): nonlocal prey, predator for i in range(steps): prey_births = alpha * prey predations = beta * prey * predator predator_births = delta * prey * predator predator_deaths = gamma * predator prey += (prey_births - predations) * dt predator += (predator_births - predator_deaths) * dt prey = max(prey, 0.0) predator = max(predator, 0.0) probe("prey", env.now, prey) probe("predator", env.now, predator) progress(int(100 * (i + 1) / steps)) yield env.timeout(dt) done.succeed({ "final_prey": prey, "final_predator": predator, }) env.process(dynamics()) env.run(until=done) return done.value def requirements(): return { "external": ["simpy==4.1.1"], } ``` ## FAQ Q: Why avoid SciPy in this implementation? A: The populations are stepped forward with Euler integration inside SimPy so it can run even when SciPy is unavailable. Q: How does the time_step affect accuracy? A: A larger dt leads to bigger numerical errors since Euler integration is only first order. Q: Where is env.timeout used? A: Each discrete step yields env.timeout(dt) to advance the SimPy clock. --- # Machines Simulation (Advanced) A machines simulation depicting resource allocation and reliability. ID: machines Tags: reliability, resource-management, maintenance, queue, throughput Stocks: buffer, idle-token store Flows: in_rate, dispatch_rate Probes: in_rate, dispatch_rate, buffer_level, breakdown, repair Context: - Example page: /examples/machines - Playground: /playground?example=machines - Collection page: /example/collections/reliability-maintenance - Tag pages: /example/tags/reliability, /example/tags/resource-management, /example/tags/maintenance, /example/tags/queue, /example/tags/throughput ## Parameters ### Default (default) ```yaml machines: 3 producer_rate_base: 12 producer_rate_amp: 6 producer_rate_freq: 0.02 service_time_mean: 0.25 breakdown_prob: 0.05 repair_time_mean: 5 sim_time: 100 initial_buffer: 0 ``` ## Simulation Code ```python import math import random import json from tys import probe, progress def simulate(cfg: dict): import simpy env = simpy.Environment() num_machines = cfg["machines"] base_rate = cfg["producer_rate_base"] # parts / sec amp_rate = cfg["producer_rate_amp"] # sine amplitude freq_rate = cfg["producer_rate_freq"] # cycles / sec mean_service_time = cfg["service_time_mean"] # mean service time (exp) breakdown_prob = cfg.get("breakdown_prob", 0.0) # chance of failure per part repair_time_mean = cfg.get("repair_time_mean", 0.0) # avg downtime when failed sim_time = cfg["sim_time"] # run length [sec] initial_buffer = cfg.get("initial_buffer", 0) buffer = simpy.Store(env) idle_tokens = simpy.Store(env, capacity=num_machines) buffer.items.extend([f'INIT_{i}' for i in range(initial_buffer)]) idle_tokens.items.extend(range(num_machines)) # one token per machine util_time = [0.0] * num_machines # cumulative busy time failures = [0] * num_machines # failure counts done = env.event() # marks simulation end def sine(t: float) -> float: return max(0.0, base_rate + amp_rate * math.sin(2 * math.pi * freq_rate * t)) def producer(): pid = 0 while True: rate = sine(env.now) probe("in_rate", env.now, rate) for _ in range(int(rate)): # integer parts/second buffer.put(f'P{pid}') pid += 1 yield env.timeout(1) env.process(producer()) dispatch_count = 0 def router(): nonlocal dispatch_count while True: part_ev = buffer.get() token_ev = idle_tokens.get() out = yield env.all_of([part_ev, token_ev]) # {event: value} part = out[part_ev] mid = out[token_ev] dispatch_count += 1 env.process(machine(mid, part)) env.process(router()) def machine(mid: int, part): svc = random.expovariate(1.0 / mean_service_time) yield env.timeout(svc) util_time[mid] += svc if breakdown_prob > 0 and random.random() < breakdown_prob: failures[mid] += 1 probe("breakdown", env.now, mid) down = random.expovariate(1.0 / repair_time_mean) if repair_time_mean > 0 else 0 if down > 0: yield env.timeout(down) probe("repair", env.now, mid) yield idle_tokens.put(mid) # announce idle def recorder(): nonlocal dispatch_count interval = 0.5 while True: buf_len = len(buffer.items) probe("buffer_level", env.now, buf_len) rate = dispatch_count / interval probe("dispatch_rate", env.now, rate) dispatch_count = 0 yield env.timeout(interval) env.process(recorder()) def stopper(): yield env.timeout(sim_time) done.succeed({ "buffer_final": len(buffer.items), "utilisation": json.dumps({f"M{i}": round(util_time[i] / sim_time, 3) for i in range(num_machines)}), "failures": json.dumps({f"M{i}": failures[i] for i in range(num_machines)}) }) env.process(stopper()) env.run(until=done) progress(100) return done.value def requirements(): return { "external": ["simpy==4.1.1"], } ``` ## FAQ Q: How do tokens keep the machines coordinated? A: Each machine releases a token when idle; the router waits for a part and a token so no machine is overloaded. Q: Why modulate the production rate with a sine wave? A: It creates bursts of work so we can see how the system handles fluctuating demand. Q: How are machine breakdowns modelled? A: Each processed part can trigger a failure with breakdown_prob leading to a repair delay. --- # Matchmaking Queue (Intermediate) Simulates skill-based matchmaking with adjustable buckets, skill delta and max wait. ID: matchmaking-queue Tags: matchmaking, queue, game, fairness Probes: queue_len, avg_wait, avg_delta Context: - Example page: /examples/matchmaking-queue - Playground: /playground?example=matchmaking-queue - Collection page: /example/collections/game-dynamics - Tag pages: /example/tags/matchmaking, /example/tags/queue, /example/tags/game, /example/tags/fairness ## Parameters ### Default (default) ```yaml arrival_rate: 2 rating_mean: 1000 rating_std: 300 rating_bucket: 100 skill_delta: 50 max_wait: 30 sim_time: 300 seed: 42 ``` ## Simulation Code ```python import random from dataclasses import dataclass from typing import List from tys import probe, progress @dataclass class Player: rating: float joined: float def closest_opponent(index: int, players: List[Player]) -> int | None: """Return the index of the rating closest to players[index].""" p1 = players[index] best_j = None best_delta = float("inf") for j, p2 in enumerate(players): if j == index: continue delta = abs(p1.rating - p2.rating) if delta < best_delta: best_delta, best_j = delta, j return best_j def bucket_match(index: int, players: List[Player], bucket_width: float, skill_delta: float) -> int | None: """Find a partner in the same rating bucket within the allowed delta.""" p1 = players[index] bucket = int(p1.rating // bucket_width) for j in range(index + 1, len(players)): p2 = players[j] same_bucket = int(p2.rating // bucket_width) == bucket if same_bucket and abs(p1.rating - p2.rating) <= skill_delta: return j return None def record_probes(now: float, waiting: List[Player], wait_times: List[float], deltas: List[float]) -> None: """Emit probe metrics for queue length, wait time and rating delta.""" probe("queue_len", now, len(waiting)) if wait_times: probe("avg_wait", now, sum(wait_times) / len(wait_times)) if deltas: probe("avg_delta", now, sum(deltas) / len(deltas)) def simulate(cfg: dict): import simpy env = simpy.Environment() rng = random.Random(cfg.get("seed", 0)) arrival_rate = cfg["arrival_rate"] # players per second rating_mean = cfg["rating_mean"] # average skill level rating_std = cfg["rating_std"] # variation in skill bucket_width = cfg["rating_bucket"] # size of rating buckets skill_delta = cfg["skill_delta"] # allowed rating difference max_wait = cfg["max_wait"] # seconds before we force a match sim_time = cfg["sim_time"] # total simulation time waiting: List[Player] = [] # players waiting for a match wait_times: List[float] = [] # how long each successful match waited deltas: List[float] = [] # rating differences in matches def spawner(): while True: delay = rng.expovariate(arrival_rate) yield env.timeout(delay) rating = max(0.0, rng.gauss(rating_mean, rating_std)) waiting.append(Player(rating, env.now)) env.process(spawner()) def matchmaker(): while True: waiting.sort(key=lambda p: p.rating) i = 0 while i < len(waiting) - 1: p1 = waiting[i] partner = bucket_match(i, waiting, bucket_width, skill_delta) if partner is None and env.now - p1.joined >= max_wait: partner = closest_opponent(i, waiting) if partner is not None: p2 = waiting.pop(partner) waiting.pop(i) wait_times.append((env.now - p1.joined + env.now - p2.joined) / 2) deltas.append(abs(p1.rating - p2.rating)) else: i += 1 record_probes(env.now, waiting, wait_times, deltas) progress(min(int(100 * env.now / sim_time), 100)) yield env.timeout(1) env.process(matchmaker()) done = env.event() def finish(): yield env.timeout(sim_time) done.succeed( { "avg_wait": sum(wait_times) / len(wait_times) if wait_times else 0.0, "avg_rating_delta": sum(deltas) / len(deltas) if deltas else 0.0, } ) env.process(finish()) env.run(until=done) return done.value def requirements(): return { "external": ["simpy==4.1.1"], } ``` ## FAQ Q: What happens if a player waits too long? A: If their wait exceeds max_wait seconds, the matchmaker pairs them with the closest rating regardless of bucket. Q: How are new player arrivals modelled? A: The spawner process uses an exponential delay with arrival_rate to mimic a Poisson stream of players. Q: How does bucket matching maintain fairness? A: Players are grouped by rating_bucket width and matched if their rating difference is within skill_delta. --- # Nested Hierarchy Workflow (Beginner) A nested hierarchy simulation where tasks flow across company departments. ID: nested-hierarchy Tags: workflow, capacity, resource-management, throughput, bottleneck Flows: arrivals, service Probes: Dept0, Dept1, Dept0-Team0, Dept0-Team1, Dept1-Team0, Dept1-Team1 Context: - Example page: /examples/nested-hierarchy - Playground: /playground?example=nested-hierarchy - Collection page: /example/collections/scheduling-priority-queues - Tag pages: /example/tags/workflow, /example/tags/capacity, /example/tags/resource-management, /example/tags/throughput, /example/tags/bottleneck ## Parameters ### Default (default) ```yaml num_departments: 2 teams_per_department: 2 tasks_total: 20 arrival_interval: 1.0 service_time: 2.0 seed: 42 ``` ## Simulation Code ```python from tys import probe, progress def simulate(cfg: dict): import random import simpy env = simpy.Environment() random.seed(cfg.get("seed", 42)) num_departments = cfg["num_departments"] teams_per_department = cfg["teams_per_department"] tasks_total = cfg["tasks_total"] arrival_interval = cfg["arrival_interval"] service_time = cfg["service_time"] processed = 0 class Team: def __init__(self, name: str): self.name = name self.resource = simpy.Resource(env, capacity=1) self.done = 0 class Department: def __init__(self, name: str): self.name = name self.teams = [Team(f"{name}-Team{i}") for i in range(teams_per_department)] self.done = 0 departments = [Department(f"Dept{i}") for i in range(num_departments)] done = env.event() def task_proc(name: str): nonlocal processed dept = random.choice(departments) team = random.choice(dept.teams) with team.resource.request() as req: yield req duration = random.expovariate(1.0 / service_time) yield env.timeout(duration) team.done += 1 dept.done += 1 processed += 1 probe(team.name, env.now, team.done) probe(dept.name, env.now, dept.done) progress(int(100 * processed / tasks_total)) if processed >= tasks_total and not done.triggered: done.succeed({"tasks_processed": processed}) def generator(): for i in range(tasks_total): env.process(task_proc(f"Task{i}")) yield env.timeout(random.expovariate(1.0 / arrival_interval)) env.process(generator()) env.run(until=done) return done.value def requirements(): return { "external": ["simpy==4.1.1"], } ``` ## FAQ Q: How is workload recorded across teams? A: Each completed task increments probes for its team and department so the distribution is visible. Q: How is a task assigned to a team? A: The task_proc function selects a random department then a random team within it for each new task. Q: When does the simulation finish? A: Once processed tasks reach tasks_total the done event resolves and env.run stops. --- # Queue Simulation (Beginner) A queue simulation demonstrating throughput limits and system capacity. ID: queue Tags: queue, throughput, bottleneck, discrete-event, service, capacity Stocks: backlog[p] Flows: produced, consumed Probes: backlog_total, incoming_per_sec, outgoing_per_sec Context: - Example page: /examples/queue - Playground: /playground?example=queue - Collection page: /example/collections/scheduling-priority-queues - Tag pages: /example/tags/queue, /example/tags/throughput, /example/tags/bottleneck, /example/tags/discrete-event, /example/tags/service, /example/tags/capacity ## Parameters ### Default (default) ```yaml partitions: 6 consumers: 4 producer_rate_base: 6000 producer_rate_amp: 3000 producer_rate_freq: 0.01 consumer_rate: 4000 hot_skew: 0.3 sim_time: 300 initial_backlog: 0 ``` ## Simulation Code ```python import math from tys import probe, progress def simulate(cfg: dict): import simpy env = simpy.Environment() num_partitions = cfg["partitions"] num_consumers = cfg["consumers"] base_rate = cfg["producer_rate_base"] # messages / sec amp_rate = cfg["producer_rate_amp"] # amplitude freq_rate = cfg["producer_rate_freq"] # cycles / sec consumer_rate = cfg["consumer_rate"] # msgs / sec per consumer sim_time_seconds = cfg["sim_time"] hot_skew = cfg.get("hot_skew", 0.0) initial_backlog_each = cfg.get("initial_backlog", 0) backlog = [initial_backlog_each] * num_partitions assignment = {p: p % num_consumers for p in range(num_partitions)} def recorder(): while True: total = sum(backlog) probe("backlog_total", env.now, total) yield env.timeout(0.5) env.process(recorder()) done = env.event() def current_production_rate(time_sec: float) -> float: return max( 0.0, base_rate + amp_rate * math.sin(2 * math.pi * freq_rate * time_sec), ) def dynamics(): nonlocal backlog for t in range(sim_time_seconds): rate_now = current_production_rate(t) probe("incoming_per_sec", env.now, rate_now) for p in range(num_partitions): if p == 0 and hot_skew > 0: share = hot_skew else: share = (1 - hot_skew) / (num_partitions - 1 or 1) produced = rate_now * share backlog[p] += produced outgoing_this_tick = 0 for c in range(num_consumers): owned_parts = [p for p, owner in assignment.items() if owner == c] if not owned_parts: continue slice_capacity = consumer_rate / len(owned_parts) for p in owned_parts: take = min(backlog[p], slice_capacity) backlog[p] -= take outgoing_this_tick += take probe("outgoing_per_sec", env.now, outgoing_this_tick) utilisation = sum(backlog) / (base_rate * sim_time_seconds) if abs(utilisation - 0.25) < 0.01: progress(5, "25 % utilisation – system healthy") if abs(utilisation - 0.50) < 0.01: progress(25, "50 % utilisation – backlog building") if abs(utilisation - 0.75) < 0.01: progress(50, "75 % utilisation – lag becoming serious") if max(backlog) > 10 * consumer_rate: progress(int(100 * t / sim_time_seconds), "Warning: hot partition overloaded") yield env.timeout(1) progress(100) done.succeed({ "final_backlog_total": sum(backlog), "max_partition_backlog": max(backlog), "lag_seconds_estimated": sum(backlog) / current_production_rate(env.now) }) env.process(dynamics()) env.run(until=done) return done.value def requirements(): return { "external": ["simpy==4.1.1"], } ``` ## FAQ Q: How does the producer rate vary? A: current_production_rate computes a sine wave so message bursts follow periodic cycles. Q: How are consumers assigned partitions? A: An assignment map balances partitions across consumers so each worker pulls from its own set. Q: What warns about a hot partition? A: Progress messages flag when any backlog exceeds ten times the consumer rate. --- # Ride-Share Surge Control (Intermediate) Dynamic pricing stabilises rider waits—until high gain and delay turn it into surge whiplash. ID: ride-surge Tags: queue, pricing, elasticity, balancing-loop, delay, oscillation Stocks: Riders Waiting, Idle Drivers, Drivers on Trip Flows: Passenger Arrivals, Driver Logins, Matches, Trip Completions, Driver Logouts Feedback Loops: Wait-driven surge (balancing), High-gain with delay (reinforcing oscillation) Probes: price_multiplier, wait_estimate, riders_waiting, idle_drivers, arrival_rate, driver_logins, matches_per_min Context: - Example page: /examples/ride-surge - Playground: /playground?example=ride-surge - Collection page: /example/collections/scheduling-priority-queues - Tag pages: /example/tags/queue, /example/tags/pricing, /example/tags/elasticity, /example/tags/balancing-loop, /example/tags/delay, /example/tags/oscillation ## Parameters ### Balanced Surge (default) ```yaml seed: 42 time_horizon: 240 initial_price: 1.0 target_wait: 3.0 wait_window: 10 base_demand: 2.2 demand_elasticity: -1.1 supply_elasticity: 0.8 trip_time: 12.0 baseline_active_drivers: 32.0 price_delay_minutes: 10 supply_time_constant: 20.0 price_gain: 0.35 price_cap: 2.5 smoothing_alpha: 0.25 shock_start: 60 shock_end: 120 shock_multiplier: 1.4 ``` ### High-Gain Surge ```yaml seed: 42 time_horizon: 240 initial_price: 1.0 target_wait: 3.0 wait_window: 10 base_demand: 2.2 demand_elasticity: -1.1 supply_elasticity: 0.8 trip_time: 12.0 baseline_active_drivers: 32.0 price_delay_minutes: 10 supply_time_constant: 20.0 price_gain: 0.7 price_cap: 3.0 smoothing_alpha: 0.25 shock_start: 60 shock_end: 120 shock_multiplier: 1.4 ``` ### No Surge (k=0) ```yaml seed: 42 time_horizon: 240 initial_price: 1.0 target_wait: 3.0 wait_window: 10 base_demand: 2.2 demand_elasticity: -1.1 supply_elasticity: 0.8 trip_time: 12.0 baseline_active_drivers: 32.0 price_delay_minutes: 10 supply_time_constant: 20.0 price_gain: 0.0 price_cap: 2.5 smoothing_alpha: 0.0 shock_start: 60 shock_end: 120 shock_multiplier: 1.4 ``` ## Simulation Code ```python from tys import probe, progress def simulate(cfg: dict): import math import random from collections import deque import simpy env = simpy.Environment() horizon = int(cfg.get("time_horizon", 240)) seed = int(cfg.get("seed", 0)) rng = random.Random(seed) base_demand = float(cfg["base_demand"]) demand_elasticity = float(cfg["demand_elasticity"]) supply_elasticity = float(cfg["supply_elasticity"]) trip_time = float(cfg["trip_time"]) baseline_active = float(cfg["baseline_active_drivers"]) target_wait = float(cfg.get("target_wait", 3.0)) wait_window = int(cfg.get("wait_window", 10)) price_gain = float(cfg.get("price_gain", 0.0)) price_cap = float(cfg.get("price_cap", 3.0)) smoothing_alpha = float(cfg.get("smoothing_alpha", 0.0)) initial_price = float(cfg.get("initial_price", 1.0)) price_delay = int(cfg.get("price_delay_minutes", 0)) supply_tau = float(cfg.get("supply_time_constant", 1.0)) shock_start = int(cfg.get("shock_start", 0)) shock_end = int(cfg.get("shock_end", 0)) shock_multiplier = float(cfg.get("shock_multiplier", 1.0)) assert wait_window > 0, "wait_window must be positive" assert supply_tau > 0, "supply_time_constant must be positive" assert price_cap >= 1.0, "price_cap must be at least 1.0" def poisson(lam: float) -> int: """Sample a Poisson random variate with mean ``lam``.""" if lam <= 0: return 0 L = math.exp(-lam) k = 0 p = 1.0 while p > L: k += 1 p *= rng.random() return k - 1 price = max(1.0, initial_price) price_history = deque([price] * (max(price_delay, 0) + 1), maxlen=max(price_delay, 0) + 1) queue = 0.0 idle_drivers = baseline_active engaged_drivers = 0.0 match_history = deque([base_demand] * wait_window, maxlen=wait_window) total_matches = 0.0 wait_samples = [] price_samples = [] def recorder(): while True: probe("riders_waiting", env.now, queue) probe("idle_drivers", env.now, idle_drivers) probe("drivers_on_trip", env.now, engaged_drivers) yield env.timeout(1) env.process(recorder()) done = env.event() def dynamics(): nonlocal price, queue, idle_drivers, engaged_drivers, total_matches for minute in range(horizon): shock = shock_multiplier if shock_start <= minute < shock_end else 1.0 arrival_rate = max(0.0, base_demand * (price ** demand_elasticity) * shock) arrivals = poisson(arrival_rate) queue += arrivals price_for_supply = price_history[0] target_active = baseline_active * (price_for_supply ** supply_elasticity) active_total = idle_drivers + engaged_drivers delta_active = (target_active - active_total) / supply_tau proposed_active = max(active_total + delta_active, 0.0) change_active = proposed_active - active_total if change_active >= 0: driver_logins = change_active driver_logouts = 0.0 idle_drivers += driver_logins else: driver_logins = 0.0 driver_logouts = min(-change_active, idle_drivers) idle_drivers -= driver_logouts proposed_active = idle_drivers + engaged_drivers change_active = proposed_active - active_total capacity = min(idle_drivers, queue) matches = max(0.0, capacity) queue -= matches idle_drivers -= matches engaged_drivers += matches total_matches += matches completions = engaged_drivers / trip_time if trip_time > 0 else engaged_drivers completions = min(completions, engaged_drivers) engaged_drivers -= completions idle_drivers += completions match_history.append(max(matches, 1e-6)) avg_service = max(sum(match_history) / len(match_history), 1e-6) wait_estimate = queue / avg_service error = (wait_estimate - target_wait) / max(target_wait, 1e-6) price_raw = price + price_gain * error price_raw = min(max(price_raw, 1.0), price_cap) if 0.0 < smoothing_alpha < 1.0: price_next = (1 - smoothing_alpha) * price + smoothing_alpha * price_raw else: price_next = price_raw price_history.append(price_next) probe("price_multiplier", env.now, price) probe("wait_estimate", env.now, wait_estimate) probe("arrival_rate", env.now, arrival_rate) probe("driver_logins", env.now, driver_logins) probe("driver_logouts", env.now, driver_logouts) probe("matches_per_min", env.now, matches) probe("trip_completions", env.now, completions) price_samples.append(price) wait_samples.append(wait_estimate) if minute == shock_start: progress(int(100 * minute / horizon), "🌧️ Demand shock hits") if minute == shock_end: progress(int(100 * minute / horizon), "☀️ Demand normalises") if price_next >= price_cap * 0.95: progress(int(100 * minute / horizon), "⚠️ Surge near cap") if wait_estimate > 2 * target_wait: progress(int(100 * minute / horizon), "⏱️ Waits running hot") price = price_next yield env.timeout(1) progress(100) sorted_waits = sorted(wait_samples) def percentile(data, pct): if not data: return 0.0 k = (len(data) - 1) * pct f = math.floor(k) c = math.ceil(k) if f == c: return data[int(k)] return data[f] * (c - k) + data[c] * (k - f) avg_wait = sum(wait_samples) / len(wait_samples) if wait_samples else 0.0 avg_price = sum(price_samples) / len(price_samples) if price_samples else price price_variance = sum((p - avg_price) ** 2 for p in price_samples) / len(price_samples) if price_samples else 0.0 done.succeed({ "avg_wait_minutes": avg_wait, "p95_wait_minutes": percentile(sorted_waits, 0.95), "peak_wait_minutes": max(wait_samples) if wait_samples else 0.0, "avg_price_multiplier": avg_price, "price_volatility": math.sqrt(price_variance), "completed_trips": total_matches, }) env.process(dynamics()) env.run(until=done) return done.value def requirements(): """Dependencies needed for this simulation.""" return { "external": ["simpy==4.1.1"], } ``` ## FAQ Q: What sets the tipping point between smooth control and oscillation? A: It's the product of price gain, supply delay and sensitivity. When their product grows too large the balancing loop crosses the damping boundary and the system rings. Q: Why do waits spike even with surge enabled? A: Demand reacts immediately to price, but additional drivers arrive with a lag. During the shock the fleet is temporarily capacity constrained until supply catches up. Q: Does higher surge always reduce waits? A: No. Beyond the edge, a higher gain over-suppresses demand, then overshoots supply, creating whiplash and higher wait variance. Q: What would stabilise a whiplashy system? A: Lower the gain, add smoothing, cap price changes, or shorten the supply delay with tactics like pre-positioning or scheduled incentives. Q: What real-world analogues fit this pattern? A: Any price-as-controller with delayed capacity—electricity markets with generator start-up lags, delivery platforms, even cloud spot pricing. --- # Rule 110 (Beginner) Elementary cellular automaton with complex emergent behavior. ID: rule-110 Tags: cellular-automata, grid Probes: ones Context: - Example page: /examples/rule-110 - Playground: /playground?example=rule-110 - Collection page: /example/collections/cellular-automata - Tag pages: /example/tags/cellular-automata, /example/tags/grid ## Parameters ### Default (default) ```yaml width: 128 steps: 128 ``` ## Simulation Code ```python from tys import progress, frame, probe def simulate(cfg: dict): width = cfg["width"] steps = cfg["steps"] state = 1 << (width // 2) mask = (1 << width) - 1 rule = [0, 1, 1, 1, 0, 1, 1, 0] def row_from_state(s: int) -> list[int]: return [1 if (s >> (width - 1 - i)) & 1 else 0 for i in range(width)] rows: list[list[int]] = [] for step in range(steps): row = row_from_state(state) rows.append(row) probe("ones", step, sum(row)) progress(int(100 * (step + 1) / steps)) new_state = 0 for i in range(width): left = (state >> (width - i)) & 1 if i > 0 else 0 center = (state >> (width - 1 - i)) & 1 right = (state >> (width - 2 - i)) & 1 if i < width - 1 else 0 index = (left << 2) | (center << 1) | right if rule[index]: new_state |= 1 << (width - 1 - i) state = new_state & mask row = row_from_state(state) rows.append(row) frame(0, rows) ones = sum(row) return {"ones": ones} def requirements(): return { "external": [] } ``` --- # Rule 30 (Beginner) Elementary cellular automaton that evolves to chaotic patterns. ID: rule-30 Tags: cellular-automata, grid Probes: ones Context: - Example page: /examples/rule-30 - Playground: /playground?example=rule-30 - Collection page: /example/collections/cellular-automata - Tag pages: /example/tags/cellular-automata, /example/tags/grid ## Parameters ### Default (default) ```yaml width: 128 steps: 128 ``` ## Simulation Code ```python from tys import progress, frame, probe def simulate(cfg: dict): width = cfg["width"] steps = cfg["steps"] state = 1 << (width // 2) mask = (1 << width) - 1 def row_from_state(s: int) -> list[int]: return [1 if (s >> (width - 1 - i)) & 1 else 0 for i in range(width)] rows: list[list[int]] = [] for step in range(steps): row = row_from_state(state) rows.append(row) probe("ones", step, sum(row)) progress(int(100 * (step + 1) / steps)) state = ((state >> 1) ^ (state | (state << 1))) & mask row = row_from_state(state) rows.append(row) frame(0, rows) ones = sum(row) return {"ones": ones} def requirements(): return { "external": [] } ``` --- # Rumor vs Counterwave (Intermediate) Competing rumor and fact-check messages racing across a social network. ID: rumor-counterwave Tags: competing-contagion, social-network, rumor, fact-check Stocks: rumor, correction Flows: transmissions Probes: rumor, correction Context: - Example page: /examples/rumor-counterwave - Playground: /playground?example=rumor-counterwave - Collection page: /example/collections/social-information-contagion - Tag pages: /example/tags/competing-contagion, /example/tags/social-network, /example/tags/rumor, /example/tags/fact-check ## Parameters ### Default (default) ```yaml num_nodes: 200 avg_degree: 6 rumor_seed: 5 counter_seed: 5 rumor_p: 0.3 rumor_decay: 0.05 correction_p: 0.25 correction_decay: 0.05 counter_delay: 5 sim_time: 50 seed: 12345 ``` ## Simulation Code ```python from tys import probe, progress def simulate(cfg: dict): """Simulate competing contagions on a random social graph.""" import random import simpy env = simpy.Environment() rng = random.Random(cfg.get("seed", 12345)) n = cfg["num_nodes"] # number of users in the network degree = cfg["avg_degree"] # average connections per user rumor_p = cfg["rumor_p"] # chance a rumor holder forwards to each neighbour rumor_decay = cfg["rumor_decay"] # daily chance a rumor fades correction_p = cfg["correction_p"] # chance a correction holder forwards correction_decay = cfg["correction_decay"] counter_delay = cfg["counter_delay"] # days before fact-check seeding rumor_seed = cfg["rumor_seed"] # initial rumor carriers counter_seed = cfg["counter_seed"] # initial correction carriers sim_time = cfg["sim_time"] neighbors = {i: set() for i in range(n)} for i in range(n): while len(neighbors[i]) < degree: j = rng.randrange(n) if j != i and j not in neighbors[i]: neighbors[i].add(j) neighbors[j].add(i) rumor = set(rng.sample(range(n), rumor_seed)) correction = set() initial_rumor = len(rumor) initial_correction = counter_seed rumor_spread = 0 correction_spread = 0 rumor_peak = len(rumor) rumor_peak_time = 0 correction_half_time = None done = env.event() def dynamics(): nonlocal rumor, correction, rumor_peak, rumor_peak_time, correction_half_time nonlocal rumor_spread, correction_spread for t in range(sim_time): new_rumor = set() new_correction = set() if t == counter_delay: seeds = rng.sample([x for x in range(n) if x not in correction], counter_seed) new_correction.update(seeds) for node in list(rumor): for neigh in neighbors[node]: if neigh not in rumor and neigh not in correction: if rng.random() < rumor_p: new_rumor.add(neigh) rumor_spread += 1 if rng.random() < rumor_decay: rumor.remove(node) for node in list(correction): for neigh in neighbors[node]: if neigh not in correction: if neigh in rumor: rumor.discard(neigh) if rng.random() < correction_p: new_correction.add(neigh) correction_spread += 1 if rng.random() < correction_decay: correction.remove(node) rumor.update(new_rumor - correction) correction.update(new_correction) probe("rumor", env.now, len(rumor)) probe("correction", env.now, len(correction)) if len(rumor) > rumor_peak: rumor_peak = len(rumor) rumor_peak_time = t if correction_half_time is None and len(correction) >= n / 2: correction_half_time = t progress(int(100 * t / sim_time)) yield env.timeout(1) progress(100) final_coverage = len(correction) / n * 100 time_gap = None if correction_half_time is not None: time_gap = correction_half_time - rumor_peak_time done.succeed({ "peak_rumor": rumor_peak, "correction_coverage": final_coverage, "time_gap": time_gap, "R0_rumor": rumor_spread / initial_rumor if initial_rumor else 0, "R0_correction": correction_spread / initial_correction if initial_correction else 0, }) env.process(dynamics()) env.run(until=done) return done.value def requirements(): return { "external": ["simpy==4.1.1"], } ``` ## FAQ Q: When is the fact-check counterwave seeded? A: At the configured counter_delay time the simulation introduces counter_seed nodes spreading the correction. Q: How does rumor propagation stop? A: Each day rumor holders may forget due to rumor_decay or switch to the correction when contacted. Q: Which metrics summarize spread? A: The result object reports peak_rumor, correction_coverage and basic reproduction numbers for both waves. --- # Savings vs Credit-Card Debt (Beginner) A savings vs debt simulation comparing compounding interest effects. ID: savings-vs-debt Tags: debt, reinforcing-loop, exponential Stocks: savings_balance, debt_balance Feedback Loops: compound interest on savings (reinforcing), compound interest on debt (reinforcing) Probes: savings_balance, debt_balance Context: - Example page: /examples/savings-vs-debt - Playground: /playground?example=savings-vs-debt - Collection page: /example/collections/financial-flows - Tag pages: /example/tags/debt, /example/tags/reinforcing-loop, /example/tags/exponential ## Parameters ### Default (default) ```yaml initial_savings: 1000 initial_debt: 2000 savings_rate: 0.005 debt_rate: 0.015 monthly_deposit: 100 monthly_payment: 50 months: 36 ``` ## Simulation Code ```python from tys import probe, progress def simulate(cfg: dict): import simpy env = simpy.Environment() savings = cfg["initial_savings"] debt = cfg["initial_debt"] savings_rate = cfg["savings_rate"] # monthly interest rate debt_rate = cfg["debt_rate"] # monthly interest rate deposit = cfg["monthly_deposit"] # deposit to savings each month payment = cfg["monthly_payment"] # debt payment each month months = cfg["months"] done = env.event() def cycle(): nonlocal savings, debt for m in range(months): savings *= 1 + savings_rate debt *= 1 + debt_rate savings += deposit debt = max(0, debt - payment) probe("savings_balance", env.now, savings) probe("debt_balance", env.now, debt) progress(int(100 * (m + 1) / months)) yield env.timeout(1) done.succeed({"final_savings": savings, "final_debt": debt}) env.process(cycle()) env.run(until=done) return done.value def requirements(): return { "external": ["simpy==4.1.1"], } ``` ## FAQ Q: Which balance grows faster? A: Each month savings and debt both accrue interest, but deposits add to savings while payments reduce the debt. Q: How is interest applied each month? A: Balances are multiplied by 1 + savings_rate or debt_rate before deposits and payments. Q: Can the debt balance go negative? A: No. After each payment the balance is clamped at zero once paid off. --- # SIR Model with Vaccination (Intermediate) An SIR vaccination simulation modeling epidemics with optional vaccination. ID: sir-vaccination Tags: population, reinforcing-loop, balancing-loop, control Stocks: susceptible, infected, recovered Flows: infections, recoveries, vaccinations Feedback Loops: disease spread (reinforcing), herd immunity (balancing) Probes: susceptible, infected, recovered Context: - Example page: /examples/sir-vaccination - Playground: /playground?example=sir-vaccination - Collection page: /example/collections/population-dynamics - Tag pages: /example/tags/population, /example/tags/reinforcing-loop, /example/tags/balancing-loop, /example/tags/control ## Parameters ### Default (default) ```yaml population: 1000 initial_infected: 10 beta: 0.3 gamma: 0.1 vaccination_rate: 0.01 vaccinate: true sim_days: 160 ``` ## Simulation Code ```python from tys import probe, progress def simulate(cfg: dict): import simpy env = simpy.Environment() population = cfg["population"] infected = cfg["initial_infected"] beta = cfg["beta"] gamma = cfg["gamma"] vacc_rate = cfg.get("vaccination_rate", 0.0) vaccinate = cfg.get("vaccinate", False) days = cfg["sim_days"] susceptible = population - infected recovered = 0.0 vaccinated_total = 0.0 done = env.event() def recorder(): while True: probe("susceptible", env.now, susceptible) probe("infected", env.now, infected) probe("recovered", env.now, recovered) yield env.timeout(1) env.process(recorder()) def dynamics(): nonlocal susceptible, infected, recovered, vaccinated_total for d in range(days): new_infections = beta * susceptible * infected / population new_recoveries = gamma * infected if vaccinate: new_vaccinated = vacc_rate * susceptible else: new_vaccinated = 0.0 susceptible = max(susceptible - new_infections - new_vaccinated, 0.0) infected = max(infected + new_infections - new_recoveries, 0.0) recovered = min(recovered + new_recoveries + new_vaccinated, population) vaccinated_total += new_vaccinated if infected < 1: progress(int(100 * d / days), "🎉 Epidemic nearly over") yield env.timeout(1) progress(100) done.succeed({ "final_infected": infected, "vaccinated_total": vaccinated_total }) env.process(dynamics()) env.run(until=done) return done.value def requirements(): return { "external": ["simpy==4.1.1"], } ``` ## FAQ Q: How is vaccination modeled? A: If vaccinate is true, each day susceptible individuals are reduced by vacc_rate * susceptible and moved to the recovered count. Q: How does infection spread each day? A: New infections equal beta * susceptible * infected divided by the total population. Q: When is progress text shown? A: If infected falls below one the simulation marks near completion with a progress message. --- # Sleep Debt Simulation (Intermediate) A sleep debt simulation tracking caffeine, sleep patterns, and debt buildup. ID: sleep-debt Tags: debt, reinforcing-loop, balancing-loop Stocks: sleep_debt_hours Feedback Loops: coffee reduces sleep (reinforcing), circadian pressure triggers sleep (balancing) Probes: sleep_debt, subjective_energy Context: - Example page: /examples/sleep-debt - Playground: /playground?example=sleep-debt - Collection page: /example/collections/financial-flows - Tag pages: /example/tags/debt, /example/tags/reinforcing-loop, /example/tags/balancing-loop ## Parameters ### Default (default) ```yaml initial_debt: 6 sim_days: 30 sleep_need: 8 coffee_mg: 100 half_life: 5 cups_per_3h: 0.3 debt_to_drink: 5 nap_repay: 1.5 weekend_nap: true seed: 42 ``` ## Simulation Code ```python import random from tys import probe, progress def simulate(cfg: dict): import simpy env = simpy.Environment() sleep_debt_hours = cfg["initial_debt"] # current sleep debt (h) days = cfg["sim_days"] # simulation length (d) sleep_need = cfg["sleep_need"] # ideal nightly sleep (h) coffee_hit = cfg["coffee_mg"] # mg caffeine per cup half_life = cfg["half_life"] # caffeine half-life (h) cups_per_3h = cfg["cups_per_3h"] # cups every 3h when sleepy drink_threshold = cfg["debt_to_drink"] # start coffee above this debt nap_repay = cfg["nap_repay"] # hours debt repaid by 20-min nap weekend_nap = cfg["weekend_nap"] # take Saturday nap? (bool) caffeine_mg = 0.0 # mg in bloodstream rng = random.Random(cfg["seed"]) hours = 24 * days done = env.event() def recorder(): while True: energy = max(0, 1 - sleep_debt_hours/10) + caffeine_mg/400 probe("sleep_debt", env.now, sleep_debt_hours) probe("energy_level", env.now, energy) yield env.timeout(1) env.process(recorder()) def cycle(): nonlocal sleep_debt_hours, caffeine_mg for h in range(int(hours)): day = h // 24 hour_in_day = h % 24 caffeine_mg *= 0.5 ** (1/half_life) if 7 <= hour_in_day < 23: if sleep_debt_hours > drink_threshold and rng.random() < cups_per_3h: # sleepy binge caffeine_mg += coffee_hit progress(int(100*h/hours), f"☕ {day=}, hour {hour_in_day}: another coffee") if hour_in_day == 23: sleep_loss = min(4, caffeine_mg/200) # jittery! sleep_hours = max(4, sleep_need - sleep_debt_hours - sleep_loss) # can't always repay sleep_debt_hours = max(0, sleep_debt_hours - sleep_hours + sleep_need) # debt update if weekend_nap and hour_in_day == 23 and day % 7 == 5: sleep_debt_hours = max(0, sleep_debt_hours - nap_repay) progress(int(100*h/hours), f"🛌 Weekend nap repaid {nap_repay:.1f}h debt") if sleep_debt_hours > 20: progress(int(100*h/hours), "⚠︎ Burnout spiral — consider an intervention!") yield env.timeout(1) progress(100) done.succeed({"final_debt": sleep_debt_hours, "avg_caffeine": caffeine_mg / hours}) env.process(cycle()) env.run(until=done) return done.value def requirements(): return { "external": ["simpy==4.1.1"], } ``` ## FAQ Q: How does caffeine decay over time? A: Each hour the caffeine_mg value is multiplied by 0.5^(1/half_life) to simulate metabolism. Q: When does the person drink coffee? A: During waking hours if sleep debt exceeds debt_to_drink and a random check based on cups_per_3h passes. Q: How is nightly sleep handled? A: At hour 23 the model reduces sleep debt by the hours slept, factoring in caffeine jitters and weekend naps. --- # Thermostat Simulation (Beginner) A thermostat simulation showing balancing feedback and time delays. ID: thermostat Tags: control, time-delay, balancing-loop Stocks: indoor_temp Flows: heat_gain, heat_loss Feedback Loops: controller chasing set-point (balancing), high gain overshoot (reinforcing) Probes: indoor_temp, heater_on Context: - Example page: /examples/thermostat - Playground: /playground?example=thermostat - Collection page: /example/collections/control-systems-delays - Tag pages: /example/tags/control, /example/tags/time-delay, /example/tags/balancing-loop ## Parameters ### Default (default) ```yaml initial_temp: 18 outside_temp: 5 setpoint: 21 comfort_band: 0.5 loss_coeff: 0.05 heater_power: 0.8 gain: 1.0 sensor_delay: 3 sim_time: 120 ``` ## Simulation Code ```python from tys import probe, progress def simulate(cfg: dict): import simpy env = simpy.Environment() indoor_temp = cfg["initial_temp"] # indoor temp (°C) outside_temp = cfg["outside_temp"] # fixed outside temp setpoint = cfg["setpoint"] # desired indoor temp band = cfg["comfort_band"] # ±°C accepted dead-band loss_rate = cfg["loss_coeff"] # heat-loss constant heater_kw = cfg["heater_power"] # °C per tick when on control_gain = cfg["gain"] # controller aggressiveness sensor_delay = cfg["sensor_delay"] # perception delay (ticks) sim_time = cfg["sim_time"] sensor_history = [indoor_temp] * sensor_delay # delay line for sensed temp heater_on = False def recorder(): while True: probe("indoor_temp", env.now, indoor_temp) probe("heater_on", env.now, 1 if heater_on else 0) yield env.timeout(0.5) env.process(recorder()) done = env.event() def dynamics(): nonlocal indoor_temp, heater_on for t in range(sim_time): sensor_T = sensor_history[0] error = setpoint - sensor_T heater_on = error > control_gain * band # bang-bang with gain heat_gain = heater_kw if heater_on else 0 heat_loss = loss_rate * (indoor_temp - outside_temp) indoor_temp += heat_gain - heat_loss sensor_history.append(indoor_temp) sensor_history.pop(0) if abs(error) < band / 2 and t % 5 == 0: progress(int(100 * t / sim_time), "😌 In comfort zone") if abs(error) > 3 * band and heater_on: progress(int(100 * t / sim_time), "⚠︎ Big overshoot, heater still ON") yield env.timeout(1) progress(100) done.succeed({ "final_temp": indoor_temp, "overshoot": max(sensor_history) - setpoint, }) env.process(dynamics()) env.run(until=done) return done.value def requirements(): return { "external": ["simpy==4.1.1"], } ``` ## FAQ Q: Why does the system overshoot the setpoint? A: The heater uses a delayed sensor reading, so decisions lag behind the true temperature and overshoot occurs. Q: How is the heater turned on or off? A: The controller compares the sensed error against gain * band and toggles the heater accordingly. Q: What does sensor_delay represent? A: It is the number of ticks between the actual temperature and the value the controller reads. --- # War of Attrition (Intermediate) Two-player waiting game exploring mixed strategies and the option value of waiting. ID: war-of-attrition Tags: game-theory, waiting-game, simpy Probes: contest_length, surplus, cost_variance Context: - Example page: /examples/war-of-attrition - Playground: /playground?example=war-of-attrition - Collection page: /example/collections/game-dynamics - Tag pages: /example/tags/game-theory, /example/tags/waiting-game, /example/tags/simpy ## Parameters ### Default (default) ```yaml prize: 10 mean_cost: 1.0 rounds: 200 seed: 42 ``` ## Simulation Code ```python from tys import probe, progress def simulate(cfg: dict): """Run repeated wars of attrition and track outcomes.""" import random import simpy prize = cfg["prize"] mean_cost = cfg["mean_cost"] rounds = cfg.get("rounds", 100) rng = random.Random(cfg.get("seed")) lengths: list[float] = [] surpluses: list[float] = [] for i in range(rounds): cost_a = rng.expovariate(1 / mean_cost) cost_b = rng.expovariate(1 / mean_cost) env = simpy.Environment() first_quit = env.event() def player(name: str, cost: float): t_quit = rng.expovariate(cost) yield env.timeout(t_quit) if not first_quit.triggered: first_quit.succeed((name, t_quit, cost)) env.process(player("A", cost_a)) env.process(player("B", cost_b)) env.run(until=first_quit) loser, t, loser_cost = first_quit.value winner_cost = cost_b if loser == "A" else cost_a lengths.append(t) total_cost = (loser_cost + winner_cost) * t surpluses.append(prize - total_cost) probe("contest_length", i, t) probe("surplus", i, prize - total_cost) probe("cost_variance", i, abs(cost_a - cost_b)) progress(int(100 * (i + 1) / rounds)) avg_len = sum(lengths) / rounds avg_surplus = sum(surpluses) / rounds return { "avg_length": avg_len, "avg_surplus": avg_surplus, } def requirements(): return { "external": ["simpy==4.1.1"], } ``` ## FAQ Q: How are players' quitting times chosen? A: Each player draws a quit time from an exponential distribution inversely proportional to their cost of waiting. Q: How is the winner determined? A: Whichever player does not trigger first_quit wins the prize when the other leaves. Q: How is surplus calculated? A: The prize minus the sum of both players' waiting costs over the contest length.