| endpoint | bulk |
|---|---|
| lang | python |
| es_version | 9.3 |
| client | elasticsearch==9.3.0 |
Use elasticsearch.helpers.bulk to index multiple documents in a single
request. This helper accepts a generator, keeping memory usage constant
regardless of dataset size.
from elasticsearch.helpers import bulk
def product_actions():
products = [
{"name": "Espresso Machine Pro", "brand": "BrewMaster", "price": 899.99, "category": "appliances", "in_stock": True, "rating": 4.7},
{"name": "Noise-Cancelling Headphones", "brand": "SoundCore", "price": 249.00, "category": "electronics", "in_stock": True, "rating": 4.5},
{"name": "Ergonomic Standing Desk", "brand": "DeskCraft", "price": 599.00, "category": "furniture", "in_stock": False, "rating": 4.8},
{"name": "4K Webcam with Mic", "brand": "StreamGear", "price": 129.99, "category": "electronics", "in_stock": True, "rating": 4.3},
{"name": "Cast Iron Dutch Oven", "brand": "HearthStone", "price": 79.95, "category": "cookware", "in_stock": True, "rating": 4.9},
{"name": "Mechanical Keyboard", "brand": "TypeForce", "price": 169.00, "category": "electronics", "in_stock": True, "rating": 4.6},
{"name": "Air Purifier HEPA-13", "brand": "CleanAir", "price": 349.00, "category": "appliances", "in_stock": True, "rating": 4.4},
{"name": "Bamboo Cutting Board Set", "brand": "HearthStone", "price": 34.99, "category": "cookware", "in_stock": True, "rating": 4.2},
]
for i, product in enumerate(products, start=1):
yield {
"_index": "products",
"_id": f"prod-{i}",
"_source": product,
}
success, errors = bulk(client, product_actions())
print(f"Indexed {success} documents")Each action dict must include _index and _source. The _id field is
optional — omit it to let Elasticsearch generate IDs automatically, which
is faster when you don't need to update documents by a known key.
By default, bulk raises a BulkIndexError if any document fails.
Catch it to inspect individual failures without aborting the entire batch:
from elasticsearch.helpers import bulk, BulkIndexError
try:
success, errors = bulk(client, product_actions())
except BulkIndexError as e:
for err in e.errors:
doc_id = err["index"]["_id"]
reason = err["index"]["error"]["reason"]
print(f"Failed to index {doc_id}: {reason}")Alternatively, set raise_on_error=False to collect errors without raising:
success, errors = bulk(client, product_actions(), raise_on_error=False)
for err in errors:
print(err)Adjust chunk_size (documents per request) and max_retries to
balance throughput against cluster pressure:
success, errors = bulk(
client,
product_actions(),
chunk_size=500,
max_retries=3,
initial_backoff=1,
max_backoff=60,
)