Skip to main content

Get a single flow control key

from qstash import QStash

client = QStash("<QSTASH-TOKEN>")

info = client.flow_control.get("USER_GIVEN_KEY")
print(info.key)
print(info.wait_list_size)
print(info.parallelism_max)
print(info.parallelism_count)
print(info.rate_max)
print(info.rate_count)
print(info.rate_period)
print(info.rate_period_start)
print(info.is_paused)
print(info.is_pinned_parallelism)
print(info.is_pinned_rate)

Get global parallelism

from qstash import QStash

client = QStash("<QSTASH-TOKEN>")

info = client.flow_control.get_global_parallelism()
print(info.parallelism_max)
print(info.parallelism_count)

Pause and resume

from qstash import QStash

client = QStash("<QSTASH-TOKEN>")

# Pause delivery for a flow control key
client.flow_control.pause("USER_GIVEN_KEY")

info = client.flow_control.get("USER_GIVEN_KEY")
print(info.is_paused)  # True

# Resume delivery
client.flow_control.resume("USER_GIVEN_KEY")

Pin a fixed configuration

from qstash import QStash

client = QStash("<QSTASH-TOKEN>")

# Pin parallelism and rate so incoming messages cannot override them
client.flow_control.pin(
    "USER_GIVEN_KEY",
    {"parallelism": 3, "rate": 20, "period": 120},
)

info = client.flow_control.get("USER_GIVEN_KEY")
print(info.is_pinned_parallelism)  # True
print(info.is_pinned_rate)         # True

Unpin configuration

from qstash import QStash

client = QStash("<QSTASH-TOKEN>")

# Unpin parallelism and rate (can unpin independently)
client.flow_control.unpin(
    "USER_GIVEN_KEY",
    {"parallelism": True, "rate": True},
)

Reset rate

from qstash import QStash

client = QStash("<QSTASH-TOKEN>")

# Reset rate count and end the current rate period
client.flow_control.reset_rate("USER_GIVEN_KEY")