---
**Concurra** is a lightweight Python library for **concurrent and parallel task execution**, built to simplify the orchestration of complex workflows.
It provides a high-level interface for running tasks using threads or processes, while automatically handling **dependencies, timeouts, errors,** and **fast-fail** behavior.
With built-in support for **dependency management**, you can define execution chains where tasks wait for others to finishβallowing for flexible and safe coordination across multiple workers.
Whether you're handling I/O-bound or CPU-bound operations, Concurra helps you manage concurrency with minimal boilerplate.
---
# π Features
- β
**Simple API**: Add tasks and execute them in parallel with minimal setup.
- π **Parallel Task Execution**: Run multiple tasks concurrently using threading or multiprocessing.
- π **Dependency Management**: Define task dependencies to ensure correct execution order across complex pipelines.
- π₯ **Fast Fail Support**: Stop all tasks as soon as one fails (optional).
- β οΈ **Error Handling**: Automatically captures exceptions and supports custom logging.
- π **Progress & Status Tracking**: Track execution status and view structured results.
- π **Background Execution**: Run tasks asynchronously and fetch results later.
- π§ **Multiprocessing Support**: Bypass GIL for CPU-bound tasks using true parallelism.
- π **Abort Support**: Gracefully abort background task execution.
- β±οΈ **Timeouts**: Set a timeout per task to prevent long-running executions.
---
## β Why Not Use Native Threading or Multiprocessing?
Python offers several ways to run tasks concurrently β `threading`, `multiprocessing`, `asyncio`, and executors like `ThreadPoolExecutor`. These are powerful tools, but they come with steep learning curves, hidden complexities, and minimal guardrails β especially when managing multiple interdependent tasks.
**Concurra** builds on top of these foundations to provide a **clean, opinionated abstraction** that simplifies concurrent execution, **dependency management**, and runtime control β so you can focus on *what* to execute rather than *how*. Acting as a smart orchestration layer, Concurra emphasizes **safe**, **structured**, and **configurable concurrency**, enabling developers to build **reliable task pipelines** without reinventing the wheel.
Concurra models task dependencies using principles of a **Directed Acyclic Graph (DAG)**. Each task declares its dependencies, and Concurra ensures correct execution order by resolving these relationships dynamically at runtime.
| Challenge Using Native APIs | How Concurra Solves It |
| ------------------------------------------------- | ---------------------------------------------------------- |
| Setting up thread/process pools | β
Built-in with `max_concurrency`, no boilerplate |
| Handling exceptions from worker threads/processes | β
Automatically captured, logged, and available in results |
| Task identification | β
Assign unique labels for tracking and debugging |
| Terminating long-running or stuck tasks | β
Built-in timeout and `abort()` support |
| Ensuring a task runner is only used once | β
Enforced internallyβno accidental re-use |
| Progress logging | β
Automatic progress display and task status updates |
| Fast fail if a task breaks | β
Opt-in `fast_fail` support for early termination |
| Safe background execution | β
`execute_in_background()` and `get_background_results()` |
| Verifying task success | β
One-call `verify()` to ensure everything worked |
| Preventing duplicate task labels | β
Built-in validation |
---
## Why Developers Love Concurra
- ***Fewer bugs:*** No manual thread/process management.
- ***More control:*** Configure concurrency, fast-fail, timeout, and logging easily.
- ***Safer pipelines:*** Tasks execute only when dependencies are met.
- ***Better visibility:*** Structured results help with monitoring and debugging.
- ***Great for pipelines:*** Ideal for data processing, test automation, ETL jobs, and more.
Whether you're running 3 tasks or 300, Concurra gives you composability, clarity, and controlβall while making concurrent execution feel intuitive and safe.
---
# π¦ Installation
```bash
pip install concurra
```
---
# π Quick Start
Run your first parallel tasks in under a minute with Concurra.
This quick guide will walk you through how to:
- Set up a `TaskRunner` for concurrent execution
- Add tasks using any Python function
- Run and collect results with minimal boilerplate
---
***π§± Step 1: Create a `TaskRunner` object***
Configure parallelism and behavior like maximum concurrency or timeout.
```python
import concurra
runner = concurra.TaskRunner(max_concurrency=2)
```
---
***β Step 2: Add your tasks***
Use .add_task() to queue up any callable with a label.
```python
def say_hello():
return "Hello World"
def say_universe():
return "Hello Universe"
runner.add_task(say_hello, label="greet_world")
runner.add_task(say_universe, label="greet_universe")
```
---
***βΆοΈ Step 3: Run tasks and collect results***
Use .run() to execute the tasks concurrently and retrieve structured results.
```python
results = runner.run()
print(results)
```
π§ͺ Output:
```json
{
"greet_world": {
"task_name": "say_hello",
"status": "Successful",
"result": "Hello World",
"has_failed": false
},
"greet_universe": {
"task_name": "say_universe",
"status": "Successful",
"result": "Hello Universe",
"has_failed": false
}
}
```
***β οΈ Important Notes:***
- A TaskRunner object can be run only once.
- Once run() or execute_in_background() is called, you cannot add more tasks.
- For a new batch of parallel tasks, create a new TaskRunner object and add required tasks.
---
# API Reference
### βοΈ `TaskRunner` Class
When initializing `TaskRunner`, you can customize behavior using the following parameters:
```python
runner = concurra.TaskRunner(
max_concurrency=4,
name="MyRunner",
timeout=10,
progress_stats=True,
fast_fail=True,
use_multiprocessing=False,
logger=my_logger,
log_errors=True
)
```
***π§ Parameter Reference:***
- **`max_concurrency` (int)** β Maximum number of tasks allowed to run in parallel. Defaults to `os.cpu_count()` if not specified.
- **`name` (str)** β Optional name for the runner instance, used in logs and display outputs.
- **`timeout` (float)** β Maximum duration (in seconds) for any task to complete. Tasks exceeding this are terminated.
- **`progress_stats` (bool)** β Whether to show real-time task progress in the console. Defaults to `True`.
- **`fast_fail` (bool)** β If `True`, execution halts as soon as any task fails. Remaining tasks are aborted.
- **`use_multiprocessing` (bool)** β Use multiprocessing (separate processes) instead of multithreading. Recommended for CPU-bound tasks.
- **`logger` (Logger)** β Custom Python `Logger` instance. If not provided, a default logger is used.
- **`log_errors` (bool)** β Whether to log exceptions that occur during task execution to the logger.
---
### β `add_task()` Method
Use `.add_task()` to queue up functions to run concurrently.
```python
runner.add_task(some_function, arg1, arg2, label="task1", kwarg1=value1)
```
You can also specify dependencies between tasks to ensure correct execution order.
```python
runner.add_task(some_function, arg1, arg2, label="task2", depends_on=["task1"], kwarg1=value1)
```
***π§ Parameter Reference:***
- **`task` (callable)** β The function or callable you want to execute in parallel.
- **`*args`** β Positional arguments to pass to the task.
- **`label` (str)** β A unique identifier for the task. If not provided, the task's ID number is used.
- **`depends_on` (list of str, optional)** β A list of labels that this task depends on. The task will execute only after all its dependencies are complete.
- **`**kwargs`** β Additional keyword arguments passed to the task.
***π Notes***
- Task labels must be unique per TaskRunner instance. Re-using a label raises a ValueError.
- A task cannot depend on itself.
- Immediate circular dependencies are detected and rejected (e.g., A β B and B β A).
This allows you to control execution order when tasks have prerequisites.
---
### πββοΈ `run()` Method
When you call `.run()` on a `TaskRunner` object, you can customize its behavior using the following parameters:
```python
results = runner.run(
verify=True,
raise_exception=False,
error_message="Custom failure message"
)
```
***π§ Parameter Reference:***
- **`verify` (bool)** β Whether to automatically check if all tasks succeeded after execution. If any task failed, it logs a report or raises an exception depending on the next flag.
- **`raise_exception` (bool)** β If `True`, raises a Python `Exception` when any task fails. If `False`, failures are logged but not raised.
- **`error_message` (str)** β Optional custom message to include if `raise_exception=True` and an error occurs.
These options are useful when you're integrating Concurra into pipelines, tests, or automated workflows and need fine-grained error control.
---
### π― `execute_in_background()` Method
Starts executing tasks in the background without blocking the main thread. Useful when you want to initiate task execution and continue doing other things before fetching results later.
```python
runner.execute_in_background()
# ... continue with other work ...
```
***π Notes***
- This method does not return task results immediately.
- Once background execution starts, no new tasks can be added to the runner.
- Use get_background_results() to collect results once execution is complete.
---
### π’ `is_running()` Method
Checks whether the `TaskRunner` is currently executing tasks in the background.
Use this method to **poll or monitor execution state**, especially after calling `execute_in_background()`.
```python
if runner.is_running():
print("Tasks are still running...")
else:
print("All tasks are done!")
```
***Returns: (bool)***
- Returns True if task execution is in progress.
- Returns False if all tasks have completed or if .run() / get_background_results() has already returned.
---
### π₯ `get_background_results()` Method
Fetches and returns results after background execution has started using `execute_in_background()`.
This call **blocks until all tasks are complete**, so manual polling with `is_running()` is **not necessary**.
```python
results = runner.get_background_results(
verify=True,
raise_exception=False,
error_message="Something went wrong"
)
```
***π§ Parameter Reference:***
- **`verify` (bool, optional)** β Whether to automatically check if all tasks succeeded after execution.
- **`raise_exception` (bool, optional)** β If True, raises a Python Exception when any task fails. If False, failures are logged but not raised.
- **`error_message` (str, optional)** β Custom message to include if raise_exception=True and an error occurs.
Example:
```python
runner = concurra.TaskRunner()
runner.add_task(func1, label="t1")
runner.add_task(func2, label="t2")
runner.execute_in_background()
# do other stuff here...
# No need to poll using is_running() method, just call get_background_results it will
results = runner.get_background_results()
```
***π Notes***
- get_background_results() blocks until all tasks are finished.
- This method waits internally, so there's no need to use is_running() to poll task completion manually.
- Results returned are identical in structure to those from .run().
- Calling get_background_results() without first calling execute_in_background() will raise an error.
---
### β `abort()` Method
Gracefully terminates all currently running background tasks.
Use this method only when you've started execution with `execute_in_background()` and want to cancel the operation before it finishes.
```python
runner.abort()
```
---
# β
Example: All Tasks Pass
```python
import concurra
import time
import json
def square(x):
time.sleep(1)
return x * x
def divide(x, y):
return x / y
runner = concurra.TaskRunner(max_concurrency=4) # Uses 4 workers
runner.add_task(square, 4, label="square_4")
runner.add_task(square, 5, label="square_5")
runner.add_task(divide, 10, 2, label="divide_10_2")
results = runner.run()
print(json.dumps(results, indent=4))
```
***Console Output:***
```
INFO:concurra.core:Concurra progress: [########.................] 1/3 [33.33%] in 0 min 0.0 sec
INFO:concurra.core:Concurra progress: [#################........] 2/3 [66.67%] in 0 min 1.04 sec
INFO:concurra.core:Concurra progress: [#########################] 3/3 [100.0%] in 0 min 1.04 sec
INFO:concurra.core:
+-------------+--------+------------+------------+
| label | task | status | duration |
|-------------+--------+------------+------------|
| square_4 | square | Successful | 1.01 sec |
| square_5 | square | Successful | 1.01 sec |
| divide_10_2 | divide | Successful | 0.0 sec |
+-------------+--------+------------+------------+
```
***Output Results dict:***
```python
print(json.dumps(results, indent=4))
```
```json
{
"square_4": {
"task_name": "square",
"start_time": "2025-04-12 00:46:54",
"end_time": "2025-04-12 00:46:55",
"duration": "1.01 sec",
"duration_seconds": 1.01,
"result": 16,
"error": null,
"trace": null,
"status": "Successful",
"has_failed": false
},
"square_5": {
"task_name": "square",
"start_time": "2025-04-12 00:46:54",
"end_time": "2025-04-12 00:46:55",
"duration": "1.01 sec",
"duration_seconds": 1.01,
"result": 25,
"error": null,
"trace": null,
"status": "Successful",
"has_failed": false
},
"divide_10_2": {
"task_name": "divide",
"start_time": "2025-04-12 00:46:54",
"end_time": "2025-04-12 00:46:54",
"duration": "0.0 sec",
"duration_seconds": 0.0,
"result": 5.0,
"error": null,
"trace": null,
"status": "Successful",
"has_failed": false
}
}
```
---
# β Example: Partial Failures
```python
import concurra
import time
import json
def square(x):
time.sleep(1)
return x * x
def divide(x, y):
return x / y
runner = concurra.TaskRunner(max_concurrency=4)
runner.add_task(square, 4, label="square_4")
runner.add_task(square, 5, label="square_5")
runner.add_task(divide, 10, 2, label="divide_10_2")
runner.add_task(divide, 10, 0, label="divide_by_zero") # This will fail
results = runner.run()
print(json.dumps(results, indent=4))
```
***Console Output:***
```
INFO:concurra.core:Concurra progress: [######...................] 1/4 [25.0%] in 0 min 0.0 sec
INFO:concurra.core:Concurra progress: [############.............] 2/4 [50.0%] in 0 min 0.1 sec
INFO:concurra.core:Concurra progress: [###################......] 3/4 [75.0%] in 0 min 1.04 sec
INFO:concurra.core:Concurra progress: [#########################] 4/4 [100.0%] in 0 min 1.04 sec
ERROR:concurra.core:Execution Failed
+----------------+--------+------------+------------+
| label | task | status | duration |
|----------------+--------+------------+------------|
| square_4 | square | Successful | 1.0 sec |
| square_5 | square | Successful | 1.01 sec |
| divide_10_2 | divide | Successful | 0.0 sec |
| divide_by_zero | divide | Failed | 0.0 sec |
+----------------+--------+------------+------------+
Task 'divide_by_zero' failed with error: ZeroDivisionError: division by zero
Traceback (most recent call last):
File "../concurra/concurra/core.py", line 52, in run
result = self.task_handler.run()
^^^^^^^^^^^^^^^^^^^^^^^
File "../concurra/concurra/core.py", line 207, in run
return self.task(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "