Design Bounded Blocking Queue

medium design concurrency semaphore

Problem

Implement a thread-safe bounded blocking queue with a fixed capacity. enqueue(x) adds an element but blocks while the queue is full; dequeue() removes and returns the front element but blocks while the queue is empty; size() returns the current count. Coordinate producers and consumers with two counting semaphores — empty (free slots, starts at capacity) and full (filled slots, starts at 0).

Inputcapacity = 2, ops = enqueue(10), enqueue(20), dequeue(), enqueue(30), dequeue(), dequeue()
Outputdequeue() → 10, 20, 30
The third enqueue(30) waits until a slot frees, then FIFO order is preserved: 10, 20, 30.

from collections import deque
from threading import Semaphore, Lock

class BoundedBlockingQueue:
    def __init__(self, capacity: int):
        self.q = deque()
        self.lock = Lock()
        self.empty = Semaphore(capacity)   # free slots
        self.full = Semaphore(0)           # filled slots

    def enqueue(self, element: int) -> None:
        self.empty.acquire()               # block if full
        with self.lock:
            self.q.append(element)
        self.full.release()                # one more item ready

    def dequeue(self) -> int:
        self.full.acquire()                # block if empty
        with self.lock:
            x = self.q.popleft()
        self.empty.release()               # one slot freed
        return x

    def size(self) -> int:
        with self.lock:
            return len(self.q)
// Semaphore via promise queue (single-threaded event loop model).
class Semaphore {
  constructor(n) { this.n = n; this.waiters = []; }
  async acquire() {
    if (this.n > 0) { this.n--; return; }
    await new Promise(res => this.waiters.push(res));
  }
  release() {
    if (this.waiters.length) this.waiters.shift()();
    else this.n++;
  }
}

class BoundedBlockingQueue {
  constructor(capacity) {
    this.q = [];
    this.empty = new Semaphore(capacity); // free slots
    this.full = new Semaphore(0);         // filled slots
  }
  async enqueue(element) {
    await this.empty.acquire();           // block if full
    this.q.push(element);
    this.full.release();
  }
  async dequeue() {
    await this.full.acquire();            // block if empty
    const x = this.q.shift();
    this.empty.release();
    return x;
  }
  size() { return this.q.length; }
}
import java.util.*;
import java.util.concurrent.Semaphore;

class BoundedBlockingQueue {
    private final Deque<Integer> q = new ArrayDeque<>();
    private final Semaphore empty;   // free slots
    private final Semaphore full;    // filled slots

    public BoundedBlockingQueue(int capacity) {
        empty = new Semaphore(capacity);
        full = new Semaphore(0);
    }

    public void enqueue(int element) throws InterruptedException {
        empty.acquire();             // block if full
        synchronized (q) { q.offerLast(element); }
        full.release();
    }

    public int dequeue() throws InterruptedException {
        full.acquire();              // block if empty
        int x;
        synchronized (q) { x = q.pollFirst(); }
        empty.release();
        return x;
    }

    public int size() {
        synchronized (q) { return q.size(); }
    }
}
#include <deque>
#include <mutex>
#include <condition_variable>

class BoundedBlockingQueue {
    std::deque<int> q;
    std::mutex m;
    std::condition_variable cv;
    int cap;
public:
    BoundedBlockingQueue(int capacity) : cap(capacity) {}

    void enqueue(int element) {
        std::unique_lock<std::mutex> lk(m);
        cv.wait(lk, [&]{ return (int)q.size() < cap; }); // block if full
        q.push_back(element);
        cv.notify_all();
    }

    int dequeue() {
        std::unique_lock<std::mutex> lk(m);
        cv.wait(lk, [&]{ return !q.empty(); });           // block if empty
        int x = q.front();
        q.pop_front();
        cv.notify_all();
        return x;
    }

    int size() {
        std::unique_lock<std::mutex> lk(m);
        return (int)q.size();
    }
};
Time: O(1) per enqueue/dequeue/size Space: O(capacity)