Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions src/StfSender/StfSenderDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -318,16 +318,19 @@ void StfSenderDevice::ResetTask()
I().mFileSink->stop();
}

// Stop output handler
// Stop the RPC server first, so no new connect/disconnect/data requests can
// reference the output handler while it is being torn down.
if (!standalone()) {
I().mRpcServer->stop();
}

// Stop output handler (also required for standalone/flp-only runs)
if (I().mOutputHandler) {
I().mOutputHandler->stop();
}

if (!standalone()) {
// Stop the RPC server after output
I().mRpcServer->stop();

// Stop the Scheduler RPC client
// Stop the Scheduler RPC client
I().mTfSchedulerRpcClient.stop();
}

Expand Down
28 changes: 25 additions & 3 deletions src/StfSender/StfSenderOutputUCX.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,21 @@ void StfSenderOutputUCX::stop()
if (mDeallocThread.joinable()) {
mDeallocThread.join();
}

// join any pending async endpoint-close threads while the workers/context are still valid
{
std::vector<std::thread> lCloseThreads;
{
std::scoped_lock lLock(mDisconnectThreadsLock);
lCloseThreads = std::move(mDisconnectThreads);
mDisconnectThreads.clear();
}
for (auto &lThread : lCloseThreads) {
if (lThread.joinable()) {
lThread.join();
}
}
}
DDDLOG("StfSenderOutputUCX::stop: stopped all threads.");

// close all connections
Expand Down Expand Up @@ -244,8 +259,11 @@ bool StfSenderOutputUCX::disconnectTfBuilder(const std::string &pTfBuilderId)
lConnInfo = std::move(lConnInfoNode.mapped());
}

// Transport is only closed when other side execute close as well. Execute async
std::thread([this, pConnInfo = std::move(lConnInfo), pTfBuilderId](){
// Transport is only closed when other side execute close as well. Execute async.
// NOTE: do not detach. The thread uses our UCX workers and member state, so it must
// be joined in stop() before the workers/context are destroyed (otherwise teardown at
// end of run races the close and segfaults). See https://its.cern.ch/jira/browse/R3C-1147
std::thread lCloseThread([this, pConnInfo = std::move(lConnInfo), pTfBuilderId](){
DDDLOG("StfSenderOutputUCX::disconnectTfBuilder: closing transport for tf_builder={}", pTfBuilderId);
// acquire the lock and close the connection
std::unique_lock lTfSenderLock(pConnInfo->mTfBuilderLock);
Expand All @@ -256,8 +274,12 @@ bool StfSenderOutputUCX::disconnectTfBuilder(const std::string &pTfBuilderId)
std::scoped_lock lLockTfBuilders(mStfsInFlightMutex);
mDisconnectedTfBuilders.insert(pTfBuilderId);
}
});

}).detach();
{
std::scoped_lock lLock(mDisconnectThreadsLock);
mDisconnectThreads.emplace_back(std::move(lCloseThread));
}

return true;
}
Expand Down
19 changes: 19 additions & 0 deletions src/StfSender/StfSenderOutputUCX.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,20 @@ class StfSenderOutputUCX : public ISubTimeFrameConstVisitor {

StfSenderOutputUCX(std::shared_ptr<ConsulStfSender> pDiscoveryConfig, StdSenderOutputCounters &pCounters);

~StfSenderOutputUCX() {
// safety net: make sure no async close thread outlives the object if stop() was skipped
std::vector<std::thread> lCloseThreads;
{
std::scoped_lock lLock(mDisconnectThreadsLock);
lCloseThreads = std::move(mDisconnectThreads);
}
for (auto &lThread : lCloseThreads) {
if (lThread.joinable()) {
lThread.join();
}
}
}

bool start();
void stop();

Expand Down Expand Up @@ -212,6 +226,11 @@ class StfSenderOutputUCX : public ISubTimeFrameConstVisitor {

ConcurrentFifo<std::unique_ptr<SendStfInfo>> mSendRequestQueue;

/// async endpoint-close threads spawned by disconnectTfBuilder()
/// tracked so stop() can join them before destroying the workers/context
std::mutex mDisconnectThreadsLock;
std::vector<std::thread> mDisconnectThreads;

/// map of STFs waiting on transfers
std::mutex mStfsInFlightMutex;
std::map<std::uint64_t, std::unique_ptr<SendStfInfo>> mStfsInFlight;
Expand Down