From c94a7f4ad8cc2417b23b3250e61a601475f63c7e Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Wed, 24 Jun 2026 23:35:23 +0200 Subject: [PATCH] stfsender: fix segfault on termination (R3C-1147) At the end of every run all TfBuilders disconnect from each StfSender. Each UCX disconnect (StfSenderOutputUCX::disconnectTfBuilder) spawned a detached thread that keeps progressing a UCX worker and touching object state, but stop() never waited for it: it went straight to ucp_worker_destroy()/ucp_cleanup() and the object was then destructed. The still-running detached threads then used freed UCX workers / a destructed object, causing a SIGSEGV (core dumped) and a burst of errors as connections dropped. - StfSenderOutputUCX: track the async endpoint-close threads instead of detaching them and join them in stop() while the workers/context are still valid; add a destructor as a safety net in case stop() is skipped. - StfSenderDevice::ResetTask: stop the gRPC RPC server before the output handler so no late connect/disconnect/data request can reference the output handler mid-teardown (keeps the unconditional output stop from ce899b9 for flp-only runs). Ref: https://its.cern.ch/jira/browse/R3C-1147 --- src/StfSender/StfSenderDevice.cxx | 13 ++++++++----- src/StfSender/StfSenderOutputUCX.cxx | 28 +++++++++++++++++++++++++--- src/StfSender/StfSenderOutputUCX.h | 19 +++++++++++++++++++ 3 files changed, 52 insertions(+), 8 deletions(-) diff --git a/src/StfSender/StfSenderDevice.cxx b/src/StfSender/StfSenderDevice.cxx index 4c0fa14..f70810c 100644 --- a/src/StfSender/StfSenderDevice.cxx +++ b/src/StfSender/StfSenderDevice.cxx @@ -318,16 +318,19 @@ void StfSenderDevice::ResetTask() I().mFileSink->stop(); } - // Stop output handler + // Stop the RPC server first, so no new connect/disconnect/data requests can + // reference the output handler while it is being torn down. + if (!standalone()) { + I().mRpcServer->stop(); + } + + // Stop output handler (also required for standalone/flp-only runs) if (I().mOutputHandler) { I().mOutputHandler->stop(); } if (!standalone()) { - // Stop the RPC server after output - I().mRpcServer->stop(); - - // Stop the Scheduler RPC client + // Stop the Scheduler RPC client I().mTfSchedulerRpcClient.stop(); } diff --git a/src/StfSender/StfSenderOutputUCX.cxx b/src/StfSender/StfSenderOutputUCX.cxx index 22c7f07..b7891f8 100644 --- a/src/StfSender/StfSenderOutputUCX.cxx +++ b/src/StfSender/StfSenderOutputUCX.cxx @@ -132,6 +132,21 @@ void StfSenderOutputUCX::stop() if (mDeallocThread.joinable()) { mDeallocThread.join(); } + + // join any pending async endpoint-close threads while the workers/context are still valid + { + std::vector lCloseThreads; + { + std::scoped_lock lLock(mDisconnectThreadsLock); + lCloseThreads = std::move(mDisconnectThreads); + mDisconnectThreads.clear(); + } + for (auto &lThread : lCloseThreads) { + if (lThread.joinable()) { + lThread.join(); + } + } + } DDDLOG("StfSenderOutputUCX::stop: stopped all threads."); // close all connections @@ -244,8 +259,11 @@ bool StfSenderOutputUCX::disconnectTfBuilder(const std::string &pTfBuilderId) lConnInfo = std::move(lConnInfoNode.mapped()); } - // Transport is only closed when other side execute close as well. Execute async - std::thread([this, pConnInfo = std::move(lConnInfo), pTfBuilderId](){ + // Transport is only closed when other side execute close as well. Execute async. + // NOTE: do not detach. The thread uses our UCX workers and member state, so it must + // be joined in stop() before the workers/context are destroyed (otherwise teardown at + // end of run races the close and segfaults). See https://its.cern.ch/jira/browse/R3C-1147 + std::thread lCloseThread([this, pConnInfo = std::move(lConnInfo), pTfBuilderId](){ DDDLOG("StfSenderOutputUCX::disconnectTfBuilder: closing transport for tf_builder={}", pTfBuilderId); // acquire the lock and close the connection std::unique_lock lTfSenderLock(pConnInfo->mTfBuilderLock); @@ -256,8 +274,12 @@ bool StfSenderOutputUCX::disconnectTfBuilder(const std::string &pTfBuilderId) std::scoped_lock lLockTfBuilders(mStfsInFlightMutex); mDisconnectedTfBuilders.insert(pTfBuilderId); } + }); - }).detach(); + { + std::scoped_lock lLock(mDisconnectThreadsLock); + mDisconnectThreads.emplace_back(std::move(lCloseThread)); + } return true; } diff --git a/src/StfSender/StfSenderOutputUCX.h b/src/StfSender/StfSenderOutputUCX.h index 1bd7834..9e8545e 100644 --- a/src/StfSender/StfSenderOutputUCX.h +++ b/src/StfSender/StfSenderOutputUCX.h @@ -68,6 +68,20 @@ class StfSenderOutputUCX : public ISubTimeFrameConstVisitor { StfSenderOutputUCX(std::shared_ptr pDiscoveryConfig, StdSenderOutputCounters &pCounters); + ~StfSenderOutputUCX() { + // safety net: make sure no async close thread outlives the object if stop() was skipped + std::vector lCloseThreads; + { + std::scoped_lock lLock(mDisconnectThreadsLock); + lCloseThreads = std::move(mDisconnectThreads); + } + for (auto &lThread : lCloseThreads) { + if (lThread.joinable()) { + lThread.join(); + } + } + } + bool start(); void stop(); @@ -212,6 +226,11 @@ class StfSenderOutputUCX : public ISubTimeFrameConstVisitor { ConcurrentFifo> mSendRequestQueue; + /// async endpoint-close threads spawned by disconnectTfBuilder() + /// tracked so stop() can join them before destroying the workers/context + std::mutex mDisconnectThreadsLock; + std::vector mDisconnectThreads; + /// map of STFs waiting on transfers std::mutex mStfsInFlightMutex; std::map> mStfsInFlight;