Skip to content

Latest commit

 

History

History
82 lines (67 loc) · 3 KB

File metadata and controls

82 lines (67 loc) · 3 KB
endpoint bulk
lang python
es_version 9.3
client elasticsearch==9.3.0

Elasticsearch 9.3 bulk endpoint (Python example)

Use elasticsearch.helpers.bulk to index multiple documents in a single request. This helper accepts a generator, keeping memory usage constant regardless of dataset size.

from elasticsearch.helpers import bulk

def product_actions():
    products = [
        {"name": "Espresso Machine Pro",       "brand": "BrewMaster",  "price": 899.99, "category": "appliances",  "in_stock": True,  "rating": 4.7},
        {"name": "Noise-Cancelling Headphones", "brand": "SoundCore",   "price": 249.00, "category": "electronics", "in_stock": True,  "rating": 4.5},
        {"name": "Ergonomic Standing Desk",     "brand": "DeskCraft",   "price": 599.00, "category": "furniture",   "in_stock": False, "rating": 4.8},
        {"name": "4K Webcam with Mic",          "brand": "StreamGear",  "price": 129.99, "category": "electronics", "in_stock": True,  "rating": 4.3},
        {"name": "Cast Iron Dutch Oven",        "brand": "HearthStone", "price":  79.95, "category": "cookware",    "in_stock": True,  "rating": 4.9},
        {"name": "Mechanical Keyboard",         "brand": "TypeForce",   "price": 169.00, "category": "electronics", "in_stock": True,  "rating": 4.6},
        {"name": "Air Purifier HEPA-13",        "brand": "CleanAir",    "price": 349.00, "category": "appliances",  "in_stock": True,  "rating": 4.4},
        {"name": "Bamboo Cutting Board Set",    "brand": "HearthStone", "price":  34.99, "category": "cookware",    "in_stock": True,  "rating": 4.2},
    ]
    for i, product in enumerate(products, start=1):
        yield {
            "_index": "products",
            "_id": f"prod-{i}",
            "_source": product,
        }

success, errors = bulk(client, product_actions())
print(f"Indexed {success} documents")

Each action dict must include _index and _source. The _id field is optional — omit it to let Elasticsearch generate IDs automatically, which is faster when you don't need to update documents by a known key.

Handling errors

By default, bulk raises a BulkIndexError if any document fails. Catch it to inspect individual failures without aborting the entire batch:

from elasticsearch.helpers import bulk, BulkIndexError

try:
    success, errors = bulk(client, product_actions())
except BulkIndexError as e:
    for err in e.errors:
        doc_id = err["index"]["_id"]
        reason = err["index"]["error"]["reason"]
        print(f"Failed to index {doc_id}: {reason}")

Alternatively, set raise_on_error=False to collect errors without raising:

success, errors = bulk(client, product_actions(), raise_on_error=False)
for err in errors:
    print(err)

Large datasets

Adjust chunk_size (documents per request) and max_retries to balance throughput against cluster pressure:

success, errors = bulk(
    client,
    product_actions(),
    chunk_size=500,
    max_retries=3,
    initial_backoff=1,
    max_backoff=60,
)