From e169af9ac0f56603be763466694b95fc2fd64e8c Mon Sep 17 00:00:00 2001 From: A Vertex SDK engineer Date: Wed, 1 Jul 2026 17:17:54 -0700 Subject: [PATCH] feat: Add revision_labels, revision_expire_time, revision_ttl, disable_memory_revisions, metadata, metadata_merge_strategy and overlap_event_count to Memory Bank IngestEvents API PiperOrigin-RevId: 941371961 --- agentplatform/_genai/memories.py | 34 +++++++++++++ agentplatform/_genai/types/common.py | 49 +++++++++++++++++++ .../replays/test_ingest_events_memory_bank.py | 31 +++++++++++- 3 files changed, 112 insertions(+), 2 deletions(-) diff --git a/agentplatform/_genai/memories.py b/agentplatform/_genai/memories.py index 37dda18b38..64838020d3 100644 --- a/agentplatform/_genai/memories.py +++ b/agentplatform/_genai/memories.py @@ -270,6 +270,40 @@ def _IngestEventsConfig_to_vertex( if getv(from_object, ["force_flush"]) is not None: setv(parent_object, ["forceFlush"], getv(from_object, ["force_flush"])) + if getv(from_object, ["revision_labels"]) is not None: + setv(parent_object, ["revisionLabels"], getv(from_object, ["revision_labels"])) + + if getv(from_object, ["revision_expire_time"]) is not None: + setv( + parent_object, + ["revisionExpireTime"], + getv(from_object, ["revision_expire_time"]), + ) + + if getv(from_object, ["revision_ttl"]) is not None: + setv(parent_object, ["revisionTtl"], getv(from_object, ["revision_ttl"])) + + if getv(from_object, ["disable_memory_revisions"]) is not None: + setv( + parent_object, + ["disableMemoryRevisions"], + getv(from_object, ["disable_memory_revisions"]), + ) + + if getv(from_object, ["metadata"]) is not None: + setv( + parent_object, + ["metadata"], + {k: v for k, v in getv(from_object, ["metadata"]).items()}, + ) + + if getv(from_object, ["metadata_merge_strategy"]) is not None: + setv( + parent_object, + ["metadataMergeStrategy"], + getv(from_object, ["metadata_merge_strategy"]), + ) + return to_object diff --git a/agentplatform/_genai/types/common.py b/agentplatform/_genai/types/common.py index a13952014e..adc7d6a91e 100644 --- a/agentplatform/_genai/types/common.py +++ b/agentplatform/_genai/types/common.py @@ -7344,6 +7344,10 @@ class MemoryGenerationTriggerConfigGenerationTriggerRule(_common.BaseModel): default=None, description="""Optional. Specifies to trigger generation if the stream is inactive for the specified duration after the most recent event. The duration must have a minute-level granularity.""", ) + overlap_event_count: Optional[int] = Field( + default=None, + description="""Optional. Re-include the last N already-processed events in the next window.""", + ) class MemoryGenerationTriggerConfigGenerationTriggerRuleDict(TypedDict, total=False): @@ -7358,6 +7362,9 @@ class MemoryGenerationTriggerConfigGenerationTriggerRuleDict(TypedDict, total=Fa idle_duration: Optional[str] """Optional. Specifies to trigger generation if the stream is inactive for the specified duration after the most recent event. The duration must have a minute-level granularity.""" + overlap_event_count: Optional[int] + """Optional. Re-include the last N already-processed events in the next window.""" + MemoryGenerationTriggerConfigGenerationTriggerRuleOrDict = Union[ MemoryGenerationTriggerConfigGenerationTriggerRule, @@ -10613,6 +10620,30 @@ class IngestEventsConfig(_common.BaseModel): default=None, description="""Optional. Forces a flush of all pending events in the stream and triggers memory generation immediately bypassing any conditions configured in the `generation_trigger_config`.""", ) + revision_labels: Optional[dict[str, str]] = Field( + default=None, + description="""Labels to apply to the memory revision. For example, you can use this to label a revision with its data source.""", + ) + revision_expire_time: Optional[datetime.datetime] = Field( + default=None, + description="""Optional. Input only. Timestamp of when the revision is considered expired. If not set, the memory revision will be kept until manually deleted.""", + ) + revision_ttl: Optional[str] = Field( + default=None, + description="""Optional. Input only. The TTL for the revision. The expiration time is computed: now + TTL.""", + ) + disable_memory_revisions: Optional[bool] = Field( + default=None, + description="""Optional. Input only. If true, no revisions will be created for this request.""", + ) + metadata: Optional[dict[str, MemoryMetadataValue]] = Field( + default=None, + description="""Optional. User-provided metadata for the generated memories. This is not generated by Memory Bank.""", + ) + metadata_merge_strategy: Optional[MemoryMetadataMergeStrategy] = Field( + default=None, + description="""Optional. The strategy to use when applying metadata to existing memories.""", + ) class IngestEventsConfigDict(TypedDict, total=False): @@ -10628,6 +10659,24 @@ class IngestEventsConfigDict(TypedDict, total=False): force_flush: Optional[bool] """Optional. Forces a flush of all pending events in the stream and triggers memory generation immediately bypassing any conditions configured in the `generation_trigger_config`.""" + revision_labels: Optional[dict[str, str]] + """Labels to apply to the memory revision. For example, you can use this to label a revision with its data source.""" + + revision_expire_time: Optional[datetime.datetime] + """Optional. Input only. Timestamp of when the revision is considered expired. If not set, the memory revision will be kept until manually deleted.""" + + revision_ttl: Optional[str] + """Optional. Input only. The TTL for the revision. The expiration time is computed: now + TTL.""" + + disable_memory_revisions: Optional[bool] + """Optional. Input only. If true, no revisions will be created for this request.""" + + metadata: Optional[dict[str, MemoryMetadataValueDict]] + """Optional. User-provided metadata for the generated memories. This is not generated by Memory Bank.""" + + metadata_merge_strategy: Optional[MemoryMetadataMergeStrategy] + """Optional. The strategy to use when applying metadata to existing memories.""" + IngestEventsConfigOrDict = Union[IngestEventsConfig, IngestEventsConfigDict] diff --git a/tests/unit/agentplatform/genai/replays/test_ingest_events_memory_bank.py b/tests/unit/agentplatform/genai/replays/test_ingest_events_memory_bank.py index 373355b08c..ff7ad46583 100644 --- a/tests/unit/agentplatform/genai/replays/test_ingest_events_memory_bank.py +++ b/tests/unit/agentplatform/genai/replays/test_ingest_events_memory_bank.py @@ -40,7 +40,12 @@ def test_ingest_events(client): } ] }, - generation_trigger_config={"generation_rule": {"idle_duration": "60s"}}, + # `overlap_event_count` re-includes trailing events from one generation + # window in the next so context is preserved across GenerateMemories + # calls. + generation_trigger_config={ + "generation_rule": {"idle_duration": "60s", "overlap_event_count": 1} + }, ) memories = list( client.agent_engines.memories.retrieve( @@ -66,7 +71,17 @@ def test_ingest_events(client): } ] }, - config={"wait_for_completion": True, "force_flush": True}, + # `revision_labels`, `metadata`, and `metadata_merge_strategy` are + # applied to the memories generated from the ingested events. Because + # `force_flush` makes generation synchronous, the user-provided metadata + # is observable on the retrieved memory below. + config={ + "wait_for_completion": True, + "force_flush": True, + "revision_labels": {"source": "ingest-events-test"}, + "metadata": {"topic": {"string_value": "jobs"}}, + "metadata_merge_strategy": "OVERWRITE", + }, ) memories = list( client.agent_engines.memories.retrieve( @@ -80,6 +95,18 @@ def test_ingest_events(client): # With `wait_for_completion` and `force_flush` set to True, there should be # memories immediately after the call. assert len(memories) >= 1 + # The user-provided `metadata` should be applied to the generated memory. + assert memories[0].memory.metadata["topic"].string_value == "jobs" + + # The user-provided `revision_labels` are applied to the generated memory's + # revision (not the Memory itself), so list the memory's revisions to verify. + revisions = list( + client.agent_engines.memories.revisions.list( + name=memories[0].memory.name, + ) + ) + assert revisions + assert revisions[0].labels == {"source": "ingest-events-test"} client.agent_engines.delete(name=agent_engine.api_resource.name, force=True)