Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 49 additions & 10 deletions cinder/volume/drivers/hpe/hpe_3par_base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# (c) Copyright 2013-2015 Hewlett Packard Enterprise Development LP
# (c) Copyright 2013-2015, 2026 Hewlett Packard Enterprise Development LP
# All Rights Reserved.
#
# Copyright 2012 OpenStack Foundation
Expand Down Expand Up @@ -30,6 +30,8 @@
except ImportError:
hpeexceptions = None

import threading

from oslo_log import log as logging

from cinder import coordination
Expand Down Expand Up @@ -73,6 +75,8 @@ def __init__(self, *args, **kwargs):
self.configuration.append_config_values(san.san_opts)
self.protocol = None
self.common = None
# Thread-local storage for client sessions to prevent race conditions
self._local = threading.local()

@staticmethod
def get_driver_options():
Expand All @@ -82,40 +86,73 @@ def _init_common(self):
return hpecommon.HPE3PARCommon(self.configuration,
self._active_backend_id)

def _login(self, timeout=None, array_id=None):
if self.common:
return self.common
def _get_common(self):
"""Get the common instance for the current thread or fallback to instance."""
# Try thread-local first (used during operations like initialize_connection)
if hasattr(self._local, 'common') and self._local.common is not None:
return self._local.common
# Fallback to instance-level common (used for stats, etc.)
return self.common

self.common = self._init_common()
def _login(self, timeout=None, array_id=None):
# Check if this thread already has an active common/client session
thread_id = threading.get_ident()
if hasattr(self._local, 'common') and self._local.common is not None:
# Reuse existing session for this thread
if self._local.common.client is not None:
try:
# Verify session is still valid
session_key = self._local.common.client.get_session_key()
if session_key:
LOG.debug("Reusing existing session %s for thread %s",
session_key, thread_id)
return self._local.common
except Exception:
LOG.debug("Existing session invalid, creating new one for thread %s",
thread_id)

# Create new common instance for this thread
self._local.common = self._init_common()
# If replication is enabled and we cannot login, we do not want to
# raise an exception so a failover can still be executed.
try:
self.common.do_setup(None, timeout=timeout, stats=self._stats,
self._local.common.do_setup(None, timeout=timeout, stats=self._stats,
array_id=array_id)
self.common.client_login()
self._local.common.client_login()
session_key = self._local.common.client.get_session_key()
LOG.debug("Created new session %s for thread %s", session_key, thread_id)
except Exception:
if self.common._replication_enabled:
if self._local.common._replication_enabled:
LOG.warning("The primary array is not reachable at this "
"time. Since replication is enabled, "
"listing replication targets and failing over "
"a volume can still be performed.")
else:
raise
return self.common
return self._local.common

def _logout(self, common):
# If replication is enabled and we do not have a client ID, we did not
# login, but can still failover. There is no need to logout.
if common.client is None and common._replication_enabled:
return

thread_id = threading.get_ident()
try:
s_key = common.client.get_session_key()
LOG.debug("Disconnect from 3PAR REST with "
"session key %s using thread id %s", s_key, thread_id)
common.client_logout()
except Exception as e:
if ("invalid session key" in str(e).lower() or
"resource in use" in str(e).lower()):
LOG.warning("Session already killed or in use, ignoring.")
else:
raise
finally:
# Clean up thread-local storage
if hasattr(self._local, 'common'):
self._local.common = None

def _check_flags(self, common):
"""Sanity check to ensure we have required options set."""
Expand All @@ -141,7 +178,7 @@ def get_volume_stats(self, refresh=False):
if not refresh:
return self._stats

common = self._login()
common = self._get_common()
self._stats = common.get_volume_stats(
refresh,
self.get_filter_function(),
Expand Down Expand Up @@ -363,6 +400,8 @@ def do_setup(self, context):
self._check_flags(common)
common.check_for_setup_error()
self._do_setup(common)
# Store the common instance for non-threaded operations like get_volume_stats
self.common = common

def _do_setup(self, common):
pass
Expand Down
5 changes: 4 additions & 1 deletion cinder/volume/drivers/hpe/hpe_3par_fc.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

volume_driver=cinder.volume.drivers.hpe.hpe_3par_fc.HPE3PARFCDriver
"""

import threading
try:
from hpe3parclient import exceptions as hpeexceptions
except ImportError:
Expand Down Expand Up @@ -209,8 +209,11 @@ def initialize_connection(self, volume, connector):
"""
LOG.debug("volume id: %(volume_id)s",
{'volume_id': volume['id']})

array_id = self.get_volume_replication_driver_data(volume)
common = self._login(array_id=array_id)
s_key=common.client.get_session_key()
LOG.debug("initialize_connection entry from FC with session key %s and thread id: %s", s_key, threading.get_native_id())
try:
# we have to make sure we have a host
host, cpg = self._create_host(common, volume, connector)
Expand Down
4 changes: 4 additions & 0 deletions cinder/volume/drivers/hpe/hpe_3par_iscsi.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import re
import sys
import threading

try:
from hpe3parclient import exceptions as hpeexceptions
Expand Down Expand Up @@ -401,6 +402,9 @@ def initialize_connection(self, volume, connector):
{'volume_id': volume['id']})
array_id = self.get_volume_replication_driver_data(volume)
common = self._login(array_id=array_id)
s_key = common.client.get_session_key()
LOG.debug("initialize_connection entry from ISCSI with session key %s and thread id: %s",
s_key, threading.get_ident())
try:
# If the volume has been failed over, we need to reinitialize
# iSCSI ports so they represent the new array.
Expand Down