From 4998a5aaa09474672c679963be36258eb19e6c15 Mon Sep 17 00:00:00 2001 From: jlothe Date: Wed, 4 Feb 2026 19:09:00 +0530 Subject: [PATCH] ESC-16904 - Added changes to handle parallel thread operations to avoid overwriting session key --- cinder/volume/drivers/hpe/hpe_3par_base.py | 59 +++++++++++++++++---- cinder/volume/drivers/hpe/hpe_3par_fc.py | 5 +- cinder/volume/drivers/hpe/hpe_3par_iscsi.py | 4 ++ 3 files changed, 57 insertions(+), 11 deletions(-) diff --git a/cinder/volume/drivers/hpe/hpe_3par_base.py b/cinder/volume/drivers/hpe/hpe_3par_base.py index 1bec0190a78..b2f94d304d4 100644 --- a/cinder/volume/drivers/hpe/hpe_3par_base.py +++ b/cinder/volume/drivers/hpe/hpe_3par_base.py @@ -1,4 +1,4 @@ -# (c) Copyright 2013-2015 Hewlett Packard Enterprise Development LP +# (c) Copyright 2013-2015, 2026 Hewlett Packard Enterprise Development LP # All Rights Reserved. # # Copyright 2012 OpenStack Foundation @@ -30,6 +30,8 @@ except ImportError: hpeexceptions = None +import threading + from oslo_log import log as logging from cinder import coordination @@ -73,6 +75,8 @@ def __init__(self, *args, **kwargs): self.configuration.append_config_values(san.san_opts) self.protocol = None self.common = None + # Thread-local storage for client sessions to prevent race conditions + self._local = threading.local() @staticmethod def get_driver_options(): @@ -82,33 +86,62 @@ def _init_common(self): return hpecommon.HPE3PARCommon(self.configuration, self._active_backend_id) - def _login(self, timeout=None, array_id=None): - if self.common: - return self.common + def _get_common(self): + """Get the common instance for the current thread or fallback to instance.""" + # Try thread-local first (used during operations like initialize_connection) + if hasattr(self._local, 'common') and self._local.common is not None: + return self._local.common + # Fallback to instance-level common (used for stats, etc.) + return self.common - self.common = self._init_common() + def _login(self, timeout=None, array_id=None): + # Check if this thread already has an active common/client session + thread_id = threading.get_ident() + if hasattr(self._local, 'common') and self._local.common is not None: + # Reuse existing session for this thread + if self._local.common.client is not None: + try: + # Verify session is still valid + session_key = self._local.common.client.get_session_key() + if session_key: + LOG.debug("Reusing existing session %s for thread %s", + session_key, thread_id) + return self._local.common + except Exception: + LOG.debug("Existing session invalid, creating new one for thread %s", + thread_id) + + # Create new common instance for this thread + self._local.common = self._init_common() # If replication is enabled and we cannot login, we do not want to # raise an exception so a failover can still be executed. try: - self.common.do_setup(None, timeout=timeout, stats=self._stats, + self._local.common.do_setup(None, timeout=timeout, stats=self._stats, array_id=array_id) - self.common.client_login() + self._local.common.client_login() + session_key = self._local.common.client.get_session_key() + LOG.debug("Created new session %s for thread %s", session_key, thread_id) except Exception: - if self.common._replication_enabled: + if self._local.common._replication_enabled: LOG.warning("The primary array is not reachable at this " "time. Since replication is enabled, " "listing replication targets and failing over " "a volume can still be performed.") else: raise - return self.common + return self._local.common def _logout(self, common): # If replication is enabled and we do not have a client ID, we did not # login, but can still failover. There is no need to logout. if common.client is None and common._replication_enabled: return + + thread_id = threading.get_ident() try: + s_key = common.client.get_session_key() + LOG.debug("Disconnect from 3PAR REST with " + "session key %s using thread id %s", s_key, thread_id) common.client_logout() except Exception as e: if ("invalid session key" in str(e).lower() or @@ -116,6 +149,10 @@ def _logout(self, common): LOG.warning("Session already killed or in use, ignoring.") else: raise + finally: + # Clean up thread-local storage + if hasattr(self._local, 'common'): + self._local.common = None def _check_flags(self, common): """Sanity check to ensure we have required options set.""" @@ -141,7 +178,7 @@ def get_volume_stats(self, refresh=False): if not refresh: return self._stats - common = self._login() + common = self._get_common() self._stats = common.get_volume_stats( refresh, self.get_filter_function(), @@ -363,6 +400,8 @@ def do_setup(self, context): self._check_flags(common) common.check_for_setup_error() self._do_setup(common) + # Store the common instance for non-threaded operations like get_volume_stats + self.common = common def _do_setup(self, common): pass diff --git a/cinder/volume/drivers/hpe/hpe_3par_fc.py b/cinder/volume/drivers/hpe/hpe_3par_fc.py index 8d6773e40c2..d6443bb4c16 100644 --- a/cinder/volume/drivers/hpe/hpe_3par_fc.py +++ b/cinder/volume/drivers/hpe/hpe_3par_fc.py @@ -28,7 +28,7 @@ volume_driver=cinder.volume.drivers.hpe.hpe_3par_fc.HPE3PARFCDriver """ - +import threading try: from hpe3parclient import exceptions as hpeexceptions except ImportError: @@ -209,8 +209,11 @@ def initialize_connection(self, volume, connector): """ LOG.debug("volume id: %(volume_id)s", {'volume_id': volume['id']}) + array_id = self.get_volume_replication_driver_data(volume) common = self._login(array_id=array_id) + s_key=common.client.get_session_key() + LOG.debug("initialize_connection entry from FC with session key %s and thread id: %s", s_key, threading.get_native_id()) try: # we have to make sure we have a host host, cpg = self._create_host(common, volume, connector) diff --git a/cinder/volume/drivers/hpe/hpe_3par_iscsi.py b/cinder/volume/drivers/hpe/hpe_3par_iscsi.py index 9c0438e77e7..ac796b54f16 100644 --- a/cinder/volume/drivers/hpe/hpe_3par_iscsi.py +++ b/cinder/volume/drivers/hpe/hpe_3par_iscsi.py @@ -29,6 +29,7 @@ import re import sys +import threading try: from hpe3parclient import exceptions as hpeexceptions @@ -401,6 +402,9 @@ def initialize_connection(self, volume, connector): {'volume_id': volume['id']}) array_id = self.get_volume_replication_driver_data(volume) common = self._login(array_id=array_id) + s_key = common.client.get_session_key() + LOG.debug("initialize_connection entry from ISCSI with session key %s and thread id: %s", + s_key, threading.get_ident()) try: # If the volume has been failed over, we need to reinitialize # iSCSI ports so they represent the new array.