Queue Simulation

Message processing system demonstrating queuing theory, system capacity, and throughput limitations.

Level:Beginner

queuethroughputbottleneckarrival-rateprocessing

  • Stocks:backlog[p]
  • Flows:produced, consumed
  • Probes:backlog_total, incoming_per_sec, outgoing_per_sec

Delays

Learn about delays in systems, how they create oscillations, and their impact on system behavior and decision-making.

Explore Delays
simulation.py

Handling bursts with a shared queue

Our producer's rate rises and falls in a sine wave while consumers work through the backlog. It's a handy way to see how bursts—or a single hot partition—can blow up latency.


import math
from tys import probe, progress

Simulate a bursty producer feeding consumer workers.

def simulate(cfg: dict):

    import simpy
    env = simpy.Environment()

Parameters

    num_partitions        = cfg["partitions"]
    num_consumers         = cfg["consumers"]
    base_rate             = cfg["producer_rate_base"]   # messages / sec
    amp_rate              = cfg["producer_rate_amp"]    # amplitude
    freq_rate             = cfg["producer_rate_freq"]   # cycles / sec
    consumer_rate         = cfg["consumer_rate"]        # msgs / sec per consumer
    sim_time_seconds      = cfg["sim_time"]
    hot_skew              = cfg.get("hot_skew", 0.0)
    initial_backlog_each  = cfg.get("initial_backlog", 0)

State

    backlog = [initial_backlog_each] * num_partitions
    assignment = {p: p % num_consumers for p in range(num_partitions)}

Recorder Periodically report queue length.

    def recorder():
        while True:
            total = sum(backlog)
            probe("backlog_total", env.now, total)
            yield env.timeout(0.5)

    env.process(recorder())

    done = env.event()

Helper Instantaneous producer rate following a sine wave.

    def current_production_rate(time_sec: float) -> float:
        return max(
            0.0,
            base_rate + amp_rate * math.sin(2 * math.pi * freq_rate * time_sec),
        )

Main dynamics Produce and consume messages each second.

    def dynamics():
        nonlocal backlog
        for t in range(sim_time_seconds):
            rate_now = current_production_rate(t)
            probe("incoming_per_sec", env.now, rate_now)
  1. Produce messages
            for p in range(num_partitions):
                if p == 0 and hot_skew > 0:
                    share = hot_skew
                else:
                    share = (1 - hot_skew) / (num_partitions - 1 or 1)
                produced = rate_now * share
                backlog[p] += produced
  1. Consume messages
            outgoing_this_tick = 0
            for c in range(num_consumers):
                owned_parts = [p for p, owner in assignment.items() if owner == c]
                if not owned_parts:
                    continue
                slice_capacity = consumer_rate / len(owned_parts)
                for p in owned_parts:
                    take = min(backlog[p], slice_capacity)
                    backlog[p] -= take
                    outgoing_this_tick += take
            probe("outgoing_per_sec", env.now, outgoing_this_tick)
  1. Simple progress narration
            utilisation = sum(backlog) / (base_rate * sim_time_seconds)
            if abs(utilisation - 0.25) < 0.01:
                progress(5,  "25 % utilisation – system healthy")
            if abs(utilisation - 0.50) < 0.01:
                progress(25, "50 % utilisation – backlog building")
            if abs(utilisation - 0.75) < 0.01:
                progress(50, "75 % utilisation – lag becoming serious")
            if max(backlog) > 10 * consumer_rate:
                progress(int(100 * t / sim_time_seconds),
                         "Warning: hot partition overloaded")

            yield env.timeout(1)

        progress(100)
        done.succeed({
            "final_backlog_total":   sum(backlog),
            "max_partition_backlog": max(backlog),
            "lag_seconds_estimated": sum(backlog) / current_production_rate(env.now)
        })

    env.process(dynamics())
    env.run(until=done)
    return done.value


def requirements():
    return {
        "builtin": ["micropip", "pyyaml"],
        "external": ["simpy==4.1.1"],
    }
config.yaml
partitions: 6
consumers: 4
producer_rate_base: 6000
producer_rate_amp: 3000
producer_rate_freq: 0.01
consumer_rate: 4000
hot_skew: 0.3
sim_time: 300
initial_backlog: 0