@@ -32,7 +32,7 @@ use tracing::warn;
3232use ulid:: Ulid ;
3333
3434use crate :: {
35- alerts:: target:: Target ,
35+ alerts:: { alert_structs :: AlertStateEntry , target:: Target } ,
3636 catalog:: { manifest:: Manifest , partition_path} ,
3737 handlers:: http:: {
3838 modal:: { Metadata , NodeMetadata , NodeType } ,
@@ -49,8 +49,8 @@ use crate::{
4949 SETTINGS_ROOT_DIRECTORY , STREAM_METADATA_FILE_NAME , STREAM_ROOT_DIRECTORY ,
5050 TARGETS_ROOT_DIRECTORY ,
5151 object_storage:: {
52- alert_json_path, filter_path , manifest_path , parseable_json_path , schema_path ,
53- stream_json_path, to_bytes,
52+ alert_json_path, alert_state_json_path , filter_path , manifest_path ,
53+ parseable_json_path , schema_path , stream_json_path, to_bytes,
5454 } ,
5555 } ,
5656 users:: filters:: { Filter , migrate_v1_v2} ,
@@ -115,6 +115,102 @@ impl Metastore for ObjectStoreMetastore {
115115 . await ?)
116116 }
117117
118+ /// alerts state
119+ async fn get_alert_states ( & self ) -> Result < Vec < AlertStateEntry > , MetastoreError > {
120+ let base_path = RelativePathBuf :: from_iter ( [ ALERTS_ROOT_DIRECTORY ] ) ;
121+ let alert_state_bytes = self
122+ . storage
123+ . get_objects (
124+ Some ( & base_path) ,
125+ Box :: new ( |file_name| {
126+ file_name. starts_with ( "alert_state_" ) && file_name. ends_with ( ".json" )
127+ } ) ,
128+ )
129+ . await ?;
130+
131+ let mut alert_states = Vec :: new ( ) ;
132+ for bytes in alert_state_bytes {
133+ if let Ok ( entry) = serde_json:: from_slice :: < AlertStateEntry > ( & bytes) {
134+ alert_states. push ( entry) ;
135+ }
136+ }
137+ Ok ( alert_states)
138+ }
139+
140+ async fn get_alert_state_entry (
141+ & self ,
142+ alert_id : & Ulid ,
143+ ) -> Result < Option < AlertStateEntry > , MetastoreError > {
144+ let path = alert_state_json_path ( * alert_id) ;
145+ match self . storage . get_object ( & path) . await {
146+ Ok ( bytes) => {
147+ if let Ok ( entry) = serde_json:: from_slice :: < AlertStateEntry > ( & bytes) {
148+ Ok ( Some ( entry) )
149+ } else {
150+ Ok ( None )
151+ }
152+ }
153+ Err ( ObjectStorageError :: NoSuchKey ( _) ) => Ok ( None ) ,
154+ Err ( e) => Err ( MetastoreError :: ObjectStorageError ( e) ) ,
155+ }
156+ }
157+
158+ async fn put_alert_state ( & self , obj : & dyn MetastoreObject ) -> Result < ( ) , MetastoreError > {
159+ let id = Ulid :: from_string ( & obj. get_object_id ( ) ) . map_err ( |e| MetastoreError :: Error {
160+ status_code : StatusCode :: BAD_REQUEST ,
161+ message : e. to_string ( ) ,
162+ flow : "put_alert_state" . into ( ) ,
163+ } ) ?;
164+ let path = alert_state_json_path ( id) ;
165+
166+ // Parse the new state entry from the MetastoreObject
167+ let new_state_entry: AlertStateEntry = serde_json:: from_slice ( & to_bytes ( obj) ) ?;
168+ let new_state = new_state_entry
169+ . current_state ( )
170+ . ok_or_else ( || MetastoreError :: InvalidJsonStructure {
171+ expected : "AlertStateEntry with at least one state" . to_string ( ) ,
172+ found : "AlertStateEntry with empty states" . to_string ( ) ,
173+ } ) ?
174+ . state ;
175+
176+ // Try to read existing file
177+ let mut alert_entry = match self . storage . get_object ( & path) . await {
178+ Ok ( existing_bytes) => {
179+ if let Ok ( entry) = serde_json:: from_slice :: < AlertStateEntry > ( & existing_bytes) {
180+ entry
181+ } else {
182+ // Create new entry if parsing fails or file doesn't exist
183+ AlertStateEntry :: new ( id, new_state)
184+ }
185+ }
186+ Err ( _) => {
187+ // File doesn't exist, create new entry
188+ AlertStateEntry :: new ( id, new_state)
189+ }
190+ } ;
191+
192+ // Update the state and only save if it actually changed
193+ let state_changed = alert_entry. update_state ( new_state) ;
194+
195+ if state_changed {
196+ let updated_bytes =
197+ serde_json:: to_vec ( & alert_entry) . map_err ( MetastoreError :: JsonParseError ) ?;
198+
199+ self . storage . put_object ( & path, updated_bytes. into ( ) ) . await ?;
200+ }
201+
202+ Ok ( ( ) )
203+ }
204+
205+ /// Delete an alert state file
206+ async fn delete_alert_state ( & self , obj : & dyn MetastoreObject ) -> Result < ( ) , MetastoreError > {
207+ let path = obj. get_object_path ( ) ;
208+ Ok ( self
209+ . storage
210+ . delete_object ( & RelativePathBuf :: from ( path) )
211+ . await ?)
212+ }
213+
118214 /// This function fetches all the llmconfigs from the underlying object store
119215 async fn get_llmconfigs ( & self ) -> Result < Vec < Bytes > , MetastoreError > {
120216 let base_path = RelativePathBuf :: from_iter ( [ SETTINGS_ROOT_DIRECTORY , "llmconfigs" ] ) ;
0 commit comments