CocoIndex is designed to be production-ready from day one—built to process data in parallel, maximizing throughput while keeping your systems safe. Today, we’ll look at how to optimize performance without overloading your environment. With CocoIndex, it’s just one configuration away.
🌟 Star CocoIndex if you like it – https://github.com/cocoindex-io/cocoindex
CocoIndex’s parallelism model boosts speed by processing multiple data items at once, but more parallelism isn’t always better. Left unconstrained, excessive concurrency can strain—or even destabilize—your systems. That’s why CocoIndex includes built-in concurrency control mechanisms that strike the right balance between raw performance and system stability, even at massive scale.
Processing too many items simultaneously can cause:
- Memory exhaustion – large datasets loaded at once consume massive amounts of RAM.
- Resource contention – CPU, disk I/O, and network bandwidth get overwhelmed by competing operations.
- System instability – timeouts, degraded performance, or outright crashes.
Unlike generic concurrency features, CocoIndex lets you:
- Constrain both data volume (rows) and memory usage (bytes).
- Set limits at multiple layers: global, per source, and per-row iteration.
- Combine controls: all specified constraints must be satisfied before processing proceeds.
This layered approach ensures that resource-heavy sources don’t overwhelm the system, and nested tasks (such as splitting documents into chunks) remain predictable and safe.
You can review the full documentation here. CocoIndex is powering users’ processes at the scale of millions in production.
Concurrency Options
CocoIndex provides two primary settings:
Option |
Purpose |
Unit |
---|---|---|
|
Maximum number of rows processed concurrently. |
rows |
|
Maximum memory footprint of concurrently processed data (before transformations). |
bytes |
When a limit is reached, CocoIndex pauses new processing until some existing work completes. This keeps throughput high without pushing your system past its limits.
Where to Apply Concurrency Controls
1. Source Level
Controls how many rows from a data source are processed simultaneously. This prevents overwhelming your system when ingesting large datasets.
Source level control happens at two different granularities
- Global, in which all sources across all indexing flows share the same budget.
- Per-source, in which each source has its own budget.
Both global and per-source limits must pass before a new row is processed—providing two layers of safety.
Global Concurrency: One Setting to Shield All Flows
Global limits ensure your system never overshoots safe operating thresholds, even if individual flows attempt higher concurrency.
Apply system-wide protections either via environment variables or programmatic control:
The easiest way is to control it via environment variables:
COCOINDEX_SOURCE_MAX_INFLIGHT_ROWS=256
COCOINDEX_SOURCE_MAX_INFLIGHT_BYTES=1048576
Programmatically, configure it when calling cocoindex.init()
, which will take precedence over the environment variable:
from cocoindex import GlobalExecutionOptions
cocoindex.init(
cocoindex.Settings(
...,
global_execution_options = GlobalExecutionOptions(
source_max_inflight_rows=256,
source_max_inflight_bytes=1_048_576
)
)
)
Currently, CocoIndex uses 1024 as the default value of global max inflight rows, if you don’t explicitly set it.
Per-Source Concurrency: Granular Customization
Set different limits for each source according to workload and data characteristics:
@cocoindex.flow_def(name="DemoFlow")
def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
data_scope["documents"] = flow_builder.add_source(
DemoSourceSpec(...),
max_inflight_rows=10,
max_inflight_bytes=100*1024*1024 # 100 MB
)
2. Nested Iteration Level Concurrency: Deep Structural Control
When processing nested rows, such as processing each chunk of each document, you can configure the maximum concurrent rows and/or bytes:
with data_scope["documents"].row() as doc:
doc["chunks"] = doc["content"].transform(SplitRecursively(...))
with doc["chunks"].row(max_inflight_rows=100, max_inflight_bytes=100*1000*1000):
# Process up to 100 chunks in parallel per document
...
Summary Table: Concurrency Configuration in CocoIndex
Level |
Configuration Path |
Applies To |
---|---|---|
Global |
Environment variables, or pass |
All sources, all flows, added together |
Per-Source |
Arguments to |
Specific source/flow |
Row Iteration |
Arguments to DataSlice. |
Nested iterations |
In actual incremental pipelines, the processing bottleneck is usually at a few heavy operations, such as running inference using an AI model locally or via a remote API. It’s common to keep more data in memory even if it cannot be processed immediately—in this way, once the busy backend becomes available, new workloads can be taken on right away to keep the backends busy. However, we need a reasonable bound on this to prevent memory exhaustion and similar issues. That’s where concurrency control comes in.
- In many cases, the default global source max rows limit (1024) is already sufficient. It fits the situation described above: loading more than what heavy operations can consume, but still within a reasonable bound. You don’t need to do anything.
- You can adjust the global source row limit if the default does not work perfectly. For example, if you observe memory overuse or timeouts in certain operations, reduce the limit; on the other hand, if the system is already stable but you want it to run faster, increase the limit to see if it helps.
- If the distribution of your input data size varies greatly (e.g., it follows a long-tail distribution rather than a normal distribution), setting the max bytes limit can help prevent a small number of abnormally large inputs from overloading the system.
- If you want to run multiple flows within the same process, or have multiple sources within the same flow, and they vary in processing complexity (e.g., one source goes through a very heavy and slow model, and another only does simple data movement), you may leverage per-source control to more strictly manage the heavy path.
- If you have a high number of nested rows to process, and the specific number varies significantly, tweak concurrency control options on nested iterations.
This concurrency control framework gives you safe, scalable, and customizable flow performance. You gain flexibility (configure per-flow), control (set global limits), and the confidence to scale cocoindex flows smoothly across diverse workloads.
Support us
We’re constantly improving our runtime. Please ⭐ star CocoIndex on GitHub and share it with others.
Need help crafting a more detailed code snippet, or insight into using byte-based or default concurrency settings? Just let me know!