Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions docs/zbs-vhost/40-frontend-protocol-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

## 2. 协议枚举(`VolumeProtocol`)

```
```text
NVMEoF | iSCSI | Vhost | CBD | NBD | RBD
```

Expand Down Expand Up @@ -45,7 +45,7 @@ NVMEoF | iSCSI | Vhost | CBD | NBD | RBD

**`APIQueryExternalPrimaryStorageHostProtocolRefMsg`**

```
```http
GET /zstack/v1/external-primary-storage/host-protocol-refs?q=primaryStorageUuid=xxx
GET /zstack/v1/external-primary-storage/{primaryStorageUuid}/host-protocol-refs
```
Expand Down Expand Up @@ -73,7 +73,7 @@ SDK:`QueryExternalPrimaryStorageHostProtocolRefAction`。

**`APIAddStorageProtocolMsg`**(注意拼写:Protocol,产品侧笔记里 "APIAddStoragePotocol" 少了 r)

```
```http
POST /zstack/v1/primary-storage/protocol
{
"params": {
Expand All @@ -98,7 +98,7 @@ POST /zstack/v1/primary-storage/protocol

**`APICreateDataVolumeMsg`**(在现有创建数据卷 API 上新增可选参数)

```
```http
POST /zstack/v1/volumes/data
{
"params": {
Expand Down Expand Up @@ -138,7 +138,7 @@ Cloud 建 VM 走 `APICreateVmInstanceMsg`,数据盘由 `dataDiskOfferingUuids`
tag 格式(**注意 `ephemeral::` 前缀必填**):`ephemeral::volumeProtocol::{protocol}`,protocol ∈ §2 枚举。前缀是后端框架的「临时 tag」约定——带前缀的 tag 在建卷时被消费进 `VolumeVO.protocol` 后由框架自动丢弃、绝不落库;漏掉前缀则不被识别(既不消费也不报错,等于没传)。

**例:两块数据盘,盘 0 用 Vhost、盘 1 用 CBD**
```
```http
POST /zstack/v1/vm-instances
{
"params": {
Expand Down Expand Up @@ -167,7 +167,7 @@ POST /zstack/v1/vm-instances

**`APIChangeVolumeProtocolMsg`**(action 式)

```
```http
PUT /zstack/v1/volumes/{volumeUuid}/actions
{
"changeVolumeProtocol": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.zstack.header.vm.VmInstanceSpec;
import org.zstack.header.vm.VmInstanceState;
import org.zstack.header.volume.VolumeInventory;
import org.zstack.header.volume.VolumeProtocol;
import org.zstack.header.volume.VolumeVO;
import org.zstack.header.volume.VolumeVO_;
import org.zstack.kvm.*;
Expand Down Expand Up @@ -242,19 +243,29 @@ private void checkHostStatus(KVMHostInventory host, List<ExternalPrimaryStorageV
extPsFactory.getControllerSvc(extPs.getUuid()).reportNodeHealthy(host, new ReturnValueCompletion<NodeHealthy>(compl) {
@Override
public void success(NodeHealthy returnValue) {
returnValue.getHealthy().forEach((p, h) -> updateHostProtocolRef(host.getUuid(), extPs.getUuid(),
p.toString(), h == StorageHealthy.Ok ? PrimaryStorageHostStatus.Connected : PrimaryStorageHostStatus.Disconnected));

Map<VolumeProtocol, StorageHealthy> healthy = returnValue.getHealthy();
ErrorCode err = null;
PrimaryStorageHostStatus status;
// TODO add multi protocol support
if (returnValue.getHealthy().values().stream().allMatch(h -> h == StorageHealthy.Ok)) {
status = PrimaryStorageHostStatus.Connected;
} else {
if (healthy == null || healthy.isEmpty()) {
// an empty health report means the probe yielded nothing; fold it to
// Disconnected instead of letting allMatch() collapse to Connected
status = PrimaryStorageHostStatus.Disconnected;
err = operr(ORG_ZSTACK_EXTERNALSTORAGE_PRIMARY_KVM_10000, "external primary storage[uuid:%s, name:%s] returns unhealthy status: %s",
((ExternalPrimaryStorageVO) extPs).getUuid(), ((ExternalPrimaryStorageVO) extPs).getName(), returnValue.getHealthy());
err = operr(ORG_ZSTACK_EXTERNALSTORAGE_PRIMARY_KVM_10000, "external primary storage[uuid:%s, name:%s] reported no protocol health",
((ExternalPrimaryStorageVO) extPs).getUuid(), ((ExternalPrimaryStorageVO) extPs).getName());
compl.addError(err);
} else {
healthy.forEach((p, h) -> updateHostProtocolRef(host.getUuid(), extPs.getUuid(),
p.toString(), h == StorageHealthy.Ok ? PrimaryStorageHostStatus.Connected : PrimaryStorageHostStatus.Disconnected));

if (healthy.values().stream().allMatch(h -> h == StorageHealthy.Ok)) {
status = PrimaryStorageHostStatus.Connected;
} else {
status = PrimaryStorageHostStatus.Disconnected;
err = operr(ORG_ZSTACK_EXTERNALSTORAGE_PRIMARY_KVM_10000, "external primary storage[uuid:%s, name:%s] returns unhealthy status: %s",
((ExternalPrimaryStorageVO) extPs).getUuid(), ((ExternalPrimaryStorageVO) extPs).getName(), healthy);
compl.addError(err);
}
}

if (hostStatus.get(extPs.getUuid()) != status) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2419,15 +2419,32 @@ public void fail(ErrorCode errorCode) {
PrimaryStorageHostStatus.Disconnected);
logger.warn(String.format("failed to prepare host[%s] for protocol[%s] on primary storage[uuid:%s]: %s",
host.getUuid(), msg.getOutputProtocol(), msg.getUuid(), errorCode.getDetails()));
compl.addError(errorCode);
compl.done();
}
})
).run(new WhileDoneCompletion(completion) {
@Override
public void done(ErrorCodeList errorCodeList) {
// protocol is already registered; host preparation self-heals on the next ping
int failed = errorCodeList.getCauses().size();
if (failed > hostVOs.size() / 2) {
// most hosts cannot serve the protocol; roll back the registration so a
// mostly-unusable protocol is not exposed
unregisterOutputProtocol(msg.getUuid(), msg.getOutputProtocol());
completion.fail(errorCodeList.getCauses().get(0));
return;
}
// a minority failed; keep the protocol, those hosts self-heal on the next ping
ExternalPrimaryStorage.super.doAddProtocol(msg, completion);
}
});
}

private void unregisterOutputProtocol(String psUuid, String protocol) {
SQL.New("delete from PrimaryStorageOutputProtocolRefVO ref" +
" where ref.primaryStorageUuid = :psUuid and ref.outputProtocol = :protocol")
.param("psUuid", psUuid)
.param("protocol", protocol)
.execute();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.zstack.utils.Utils;
import org.zstack.utils.logging.CLogger;

import javax.persistence.Query;
import javax.persistence.Tuple;
import java.util.*;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -373,27 +374,21 @@ public PrimaryStorageNodeSvc getNodeSvc(String primaryStorageUuid) {

// per-protocol host connectivity is an external-PS-internal concept; the row is
// written directly here so generic primary storage messages stay protocol-agnostic.
// upsert on the (primaryStorageUuid, hostUuid, protocol) unique key so concurrent pings and
// AddProtocol on the same host cannot race a find-then-insert into duplicate rows.
@Transactional
public void updateHostProtocolStatus(String psUuid, String hostUuid, String protocol, PrimaryStorageHostStatus newStatus) {
ExternalPrimaryStorageHostProtocolRefVO ref = Q.New(ExternalPrimaryStorageHostProtocolRefVO.class)
.eq(ExternalPrimaryStorageHostProtocolRefVO_.primaryStorageUuid, psUuid)
.eq(ExternalPrimaryStorageHostProtocolRefVO_.hostUuid, hostUuid)
.eq(ExternalPrimaryStorageHostProtocolRefVO_.protocol, protocol)
.find();
if (ref == null) {
ref = new ExternalPrimaryStorageHostProtocolRefVO();
ref.setPrimaryStorageUuid(psUuid);
ref.setHostUuid(hostUuid);
ref.setProtocol(protocol);
ref.setStatus(newStatus);
dbf.persist(ref);
logger.debug(String.format("created protocol[%s] connectivity row between primary storage[uuid:%s]" +
" and host[uuid:%s] with status %s", protocol, psUuid, hostUuid, newStatus));
} else if (ref.getStatus() != newStatus) {
ref.setStatus(newStatus);
dbf.update(ref);
logger.debug(String.format("change protocol[%s] connectivity between primary storage[uuid:%s]" +
" and host[uuid:%s] to %s", protocol, psUuid, hostUuid, newStatus));
}
Query q = dbf.getEntityManager().createNativeQuery("insert into ExternalPrimaryStorageHostProtocolRefVO" +
" (hostUuid, primaryStorageUuid, protocol, status, createDate, lastOpDate)" +
" values (:hostUuid, :psUuid, :protocol, :status, current_timestamp(), current_timestamp())" +
" on duplicate key update lastOpDate = if(status <> :status, current_timestamp(), lastOpDate), status = :status");
q.setParameter("hostUuid", hostUuid);
q.setParameter("psUuid", psUuid);
q.setParameter("protocol", protocol);
q.setParameter("status", newStatus.name());
q.executeUpdate();
logger.debug(String.format("upserted protocol[%s] connectivity between primary storage[uuid:%s]" +
" and host[uuid:%s] to %s", protocol, psUuid, hostUuid, newStatus));
}

private PrimaryStorageNodeSvc getNodeSvcByVolume(VolumeInventory volumeInventory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ class ZbsVhostVolumeCase extends SubCase {
testUnknownRootVolumeProtocolSystemTagRejected()
testVhostVmStartActivationChain()
testAddProtocolPreparesHostsAndRecordsProtocolRefs()
testAddProtocolRollsBackWhenMajorityHostsFail()
}
}

Expand Down Expand Up @@ -666,6 +667,70 @@ class ZbsVhostVolumeCase extends SubCase {
}
}

// CR9 hardening: when more than half the connected hosts fail to deploy the ZBS
// client for a newly added protocol, the protocol MUST NOT stay exposed. doAddProtocol
// persists PrimaryStorageOutputProtocolRefVO before host prep (Flow1 is NoRollbackFlow),
// so a majority-failure must roll that row back and fail the API, else the UI advertises
// a protocol no host can serve. Only a baseline deployClient failure marks a host failed;
// the vhost-target ensure is best-effort/self-healing and is intentionally not counted.
void testAddProtocolRollsBackWhenMajorityHostsFail() {
AtomicBoolean deployClientShouldFail = new AtomicBoolean(false)
env.simulator(ZbsStorageController.DEPLOY_CLIENT_PATH) { HttpEntity<String> e, EnvSpec spec ->
def rsp = new ZbsStorageController.DeployClientRsp()
if (deployClientShouldFail.get()) {
rsp.success = false
rsp.error = "injected: host cannot deploy zbs client"
}
return rsp
}
env.afterSimulator(ZbsStorageController.CREATE_VOLUME_PATH) { rsp, HttpEntity<String> e ->
def cmd = JSONObjectUtil.toObject(e.body, ZbsStorageController.CreateVolumeCmd)
if (cmd.volume == ZbsConstants.ZBS_HEARTBEAT_VOLUME_NAME) {
def vrsp = new ZbsStorageController.CreateVolumeRsp()
vrsp.installPath = "zbs://${cmd.logicalPool}/${cmd.volume}".toString()
return vrsp
}
return rsp
}

// attach with deploy succeeding so the single host becomes Connected
attachPrimaryStorageToCluster {
primaryStorageUuid = ps.uuid
clusterUuid = cluster.uuid
}

// start from a PS that does not yet expose Vhost, then add it while the only
// connected host can no longer deploy the client (1 of 1 failed = majority)
SQL.New(PrimaryStorageOutputProtocolRefVO.class)
.eq(PrimaryStorageOutputProtocolRefVO_.primaryStorageUuid, ps.uuid)
.eq(PrimaryStorageOutputProtocolRefVO_.outputProtocol, VolumeProtocol.Vhost.toString())
.delete()

deployClientShouldFail.set(true)
expectApiFailure({
addStorageProtocol {
uuid = ps.uuid
outputProtocol = VolumeProtocol.Vhost.toString()
}
}) {
assert delegate != null : "addStorageProtocol must fail when the only connected host cannot be prepared"
}

assert !Q.New(PrimaryStorageOutputProtocolRefVO.class)
.eq(PrimaryStorageOutputProtocolRefVO_.primaryStorageUuid, ps.uuid)
.eq(PrimaryStorageOutputProtocolRefVO_.outputProtocol, VolumeProtocol.Vhost.toString())
.isExists() : \
"Vhost output protocol left exposed after a majority of hosts failed to prepare it; " +
"doAddProtocol must roll back PrimaryStorageOutputProtocolRefVO on majority-failure (CR9)"

// restore so a later add of Vhost (and env teardown) behaves normally
deployClientShouldFail.set(false)
detachPrimaryStorageFromCluster {
primaryStorageUuid = ps.uuid
clusterUuid = cluster.uuid
}
}

private MessageReply syncSend(org.zstack.header.message.Message msg) {
AtomicReference<MessageReply> ref = new AtomicReference<>()
CountDownLatch done = new CountDownLatch(1)
Expand Down