Queue Simulation

A queue simulation demonstrating throughput limits and system capacity.

Level:Beginner

queuethroughputbottleneckdiscrete-eventservicecapacity

  • Stocks:backlog[p]
  • Flows:produced, consumed
  • Probes:backlog_total, incoming_per_sec, outgoing_per_sec

Delays

Learn about delays in systems, how they create oscillations, and their impact on system behavior and decision-making.

Explore Delays
simulation.py

Handling bursts with a shared queue

Our producer's rate rises and falls in a sine wave while consumers work through the backlog. It's a handy way to see how bursts—or a single hot partition—can blow up latency.


import math
from tys import probe, progress

Simulate a bursty producer feeding consumer workers.

def simulate(cfg: dict):

    import simpy
    env = simpy.Environment()

Parameters

    num_partitions        = cfg["partitions"]
    num_consumers         = cfg["consumers"]
    base_rate             = cfg["producer_rate_base"]   # messages / sec
    amp_rate              = cfg["producer_rate_amp"]    # amplitude
    freq_rate             = cfg["producer_rate_freq"]   # cycles / sec
    consumer_rate         = cfg["consumer_rate"]        # msgs / sec per consumer
    sim_time_seconds      = cfg["sim_time"]
    hot_skew              = cfg.get("hot_skew", 0.0)
    initial_backlog_each  = cfg.get("initial_backlog", 0)

State

    backlog = [initial_backlog_each] * num_partitions
    assignment = {p: p % num_consumers for p in range(num_partitions)}

Recorder Periodically report queue length.

    def recorder():
        while True:
            total = sum(backlog)
            probe("backlog_total", env.now, total)
            yield env.timeout(0.5)

    env.process(recorder())

    done = env.event()

Helper Instantaneous producer rate following a sine wave.

    def current_production_rate(time_sec: float) -> float:
        return max(
            0.0,
            base_rate + amp_rate * math.sin(2 * math.pi * freq_rate * time_sec),
        )

Main dynamics Produce and consume messages each second.

    def dynamics():
        nonlocal backlog
        for t in range(sim_time_seconds):
            rate_now = current_production_rate(t)
            probe("incoming_per_sec", env.now, rate_now)
  1. Produce messages
            for p in range(num_partitions):
                if p == 0 and hot_skew > 0:
                    share = hot_skew
                else:
                    share = (1 - hot_skew) / (num_partitions - 1 or 1)
                produced = rate_now * share
                backlog[p] += produced
  1. Consume messages
            outgoing_this_tick = 0
            for c in range(num_consumers):
                owned_parts = [p for p, owner in assignment.items() if owner == c]
                if not owned_parts:
                    continue
                slice_capacity = consumer_rate / len(owned_parts)
                for p in owned_parts:
                    take = min(backlog[p], slice_capacity)
                    backlog[p] -= take
                    outgoing_this_tick += take
            probe("outgoing_per_sec", env.now, outgoing_this_tick)
  1. Simple progress narration
            utilisation = sum(backlog) / (base_rate * sim_time_seconds)
            if abs(utilisation - 0.25) < 0.01:
                progress(5,  "25 % utilisation – system healthy")
            if abs(utilisation - 0.50) < 0.01:
                progress(25, "50 % utilisation – backlog building")
            if abs(utilisation - 0.75) < 0.01:
                progress(50, "75 % utilisation – lag becoming serious")
            if max(backlog) > 10 * consumer_rate:
                progress(int(100 * t / sim_time_seconds),
                         "Warning: hot partition overloaded")

            yield env.timeout(1)

        progress(100)
        done.succeed({
            "final_backlog_total":   sum(backlog),
            "max_partition_backlog": max(backlog),
            "lag_seconds_estimated": sum(backlog) / current_production_rate(env.now)
        })

    env.process(dynamics())
    env.run(until=done)
    return done.value


def requirements():
    return {
        "builtin": ["micropip", "pyyaml"],
        "external": ["simpy==4.1.1"],
    }
Default.yaml
partitions: 6
consumers: 4
producer_rate_base: 6000
producer_rate_amp: 3000
producer_rate_freq: 0.01
consumer_rate: 4000
hot_skew: 0.3
sim_time: 300
initial_backlog: 0
Charts (Default)

backlog_total

backlog_total chartCSV
Samples601 @ 0.00–300.00
Valuesmin 0.00, mean 7649.57, median 5634.14, max 19362.56, σ 7418.31

incoming_per_sec

incoming_per_sec chartCSV
Samples300 @ 0.00–299.00
Valuesmin 3000.00, mean 6000.00, median 6000.00, max 9000.00, σ 2121.32

outgoing_per_sec

outgoing_per_sec chartCSV
Samples300 @ 0.00–299.00
Valuesmin 3005.92, mean 6000.00, median 6128.26, max 8300.00, σ 1718.71
Final Results (Default)
MetricValue
final_backlog_total0.00
max_partition_backlog0.00
lag_seconds_estimated0.00
FAQ
How does the producer rate vary?
current_production_rate computes a sine wave so message bursts follow periodic cycles.
How are consumers assigned partitions?
An assignment map balances partitions across consumers so each worker pulls from its own set.
What warns about a hot partition?
Progress messages flag when any backlog exceeds ten times the consumer rate.