Queue Simulation
Message processing system demonstrating queuing theory, system capacity, and throughput limitations.
Level:Beginner
Delays
Learn about delays in systems, how they create oscillations, and their impact on system behavior and decision-making.
Explore Delayssimulation.py
Handling bursts with a shared queue
Our producer's rate rises and falls in a sine wave while consumers work through the backlog. It's a handy way to see how bursts—or a single hot partition—can blow up latency.
import math
from tys import probe, progress
Simulate a bursty producer feeding consumer workers.
def simulate(cfg: dict):
import simpy
env = simpy.Environment()
Parameters
num_partitions = cfg["partitions"]
num_consumers = cfg["consumers"]
base_rate = cfg["producer_rate_base"] # messages / sec
amp_rate = cfg["producer_rate_amp"] # amplitude
freq_rate = cfg["producer_rate_freq"] # cycles / sec
consumer_rate = cfg["consumer_rate"] # msgs / sec per consumer
sim_time_seconds = cfg["sim_time"]
hot_skew = cfg.get("hot_skew", 0.0)
initial_backlog_each = cfg.get("initial_backlog", 0)
State
backlog = [initial_backlog_each] * num_partitions
assignment = {p: p % num_consumers for p in range(num_partitions)}
Recorder Periodically report queue length.
def recorder():
while True:
total = sum(backlog)
probe("backlog_total", env.now, total)
yield env.timeout(0.5)
env.process(recorder())
done = env.event()
Helper Instantaneous producer rate following a sine wave.
def current_production_rate(time_sec: float) -> float:
return max(
0.0,
base_rate + amp_rate * math.sin(2 * math.pi * freq_rate * time_sec),
)
Main dynamics Produce and consume messages each second.
def dynamics():
nonlocal backlog
for t in range(sim_time_seconds):
rate_now = current_production_rate(t)
probe("incoming_per_sec", env.now, rate_now)
- Produce messages
for p in range(num_partitions):
if p == 0 and hot_skew > 0:
share = hot_skew
else:
share = (1 - hot_skew) / (num_partitions - 1 or 1)
produced = rate_now * share
backlog[p] += produced
- Consume messages
outgoing_this_tick = 0
for c in range(num_consumers):
owned_parts = [p for p, owner in assignment.items() if owner == c]
if not owned_parts:
continue
slice_capacity = consumer_rate / len(owned_parts)
for p in owned_parts:
take = min(backlog[p], slice_capacity)
backlog[p] -= take
outgoing_this_tick += take
probe("outgoing_per_sec", env.now, outgoing_this_tick)
- Simple progress narration
utilisation = sum(backlog) / (base_rate * sim_time_seconds)
if abs(utilisation - 0.25) < 0.01:
progress(5, "25 % utilisation – system healthy")
if abs(utilisation - 0.50) < 0.01:
progress(25, "50 % utilisation – backlog building")
if abs(utilisation - 0.75) < 0.01:
progress(50, "75 % utilisation – lag becoming serious")
if max(backlog) > 10 * consumer_rate:
progress(int(100 * t / sim_time_seconds),
"Warning: hot partition overloaded")
yield env.timeout(1)
progress(100)
done.succeed({
"final_backlog_total": sum(backlog),
"max_partition_backlog": max(backlog),
"lag_seconds_estimated": sum(backlog) / current_production_rate(env.now)
})
env.process(dynamics())
env.run(until=done)
return done.value
def requirements():
return {
"builtin": ["micropip", "pyyaml"],
"external": ["simpy==4.1.1"],
}
config.yaml
partitions: 6
consumers: 4
producer_rate_base: 6000
producer_rate_amp: 3000
producer_rate_freq: 0.01
consumer_rate: 4000
hot_skew: 0.3
sim_time: 300
initial_backlog: 0