From 8e7610031b49abd88a9f539b4c46262594fc4e10 Mon Sep 17 00:00:00 2001 From: Nahian Pathan Date: Thu, 26 Dec 2024 16:16:06 -0500 Subject: [PATCH] notify subscriber --- .../alarms/api/generated/alarms.generated.go | 198 +++++++++--------- internal/service/alarms/api/openapi.yaml | 15 +- internal/service/alarms/api/server.go | 118 ++++++++++- .../internal/alertmanager/alertmanager.go | 1 - ...3_create_alarm_event_record_table.down.sql | 14 +- ...003_create_alarm_event_record_table.up.sql | 125 +++++++---- ...reate_alarm_subscription_info_table.up.sql | 4 +- .../internal/db/models/alarm_event_record.go | 32 +-- .../internal/db/models/alarm_subscription.go | 10 +- .../alarms/internal/db/models/converters.go | 87 +++++++- .../internal/db/repo/alarms_repository.go | 83 +++++++- internal/service/alarms/serve.go | 29 ++- internal/service/common/notifier/event.go | 62 ++++-- 13 files changed, 557 insertions(+), 221 deletions(-) diff --git a/internal/service/alarms/api/generated/alarms.generated.go b/internal/service/alarms/api/generated/alarms.generated.go index 70752884..436c1de5 100644 --- a/internal/service/alarms/api/generated/alarms.generated.go +++ b/internal/service/alarms/api/generated/alarms.generated.go @@ -27,18 +27,18 @@ import ( // Defines values for AlarmEventNotificationNotificationEventType. const ( - ACKNOWLEDGE AlarmEventNotificationNotificationEventType = 3 - CHANGE AlarmEventNotificationNotificationEventType = 1 - CLEAR AlarmEventNotificationNotificationEventType = 2 - NEW AlarmEventNotificationNotificationEventType = 0 + AlarmEventNotificationNotificationEventTypeACKNOWLEDGE AlarmEventNotificationNotificationEventType = 3 + AlarmEventNotificationNotificationEventTypeCHANGE AlarmEventNotificationNotificationEventType = 1 + AlarmEventNotificationNotificationEventTypeCLEAR AlarmEventNotificationNotificationEventType = 2 + AlarmEventNotificationNotificationEventTypeNEW AlarmEventNotificationNotificationEventType = 0 ) // Defines values for AlarmSubscriptionInfoFilter. const ( - Acknowledge AlarmSubscriptionInfoFilter = "acknowledge" - Change AlarmSubscriptionInfoFilter = "change" - Clear AlarmSubscriptionInfoFilter = "clear" - New AlarmSubscriptionInfoFilter = "new" + AlarmSubscriptionInfoFilterACKNOWLEDGE AlarmSubscriptionInfoFilter = "ACKNOWLEDGE" + AlarmSubscriptionInfoFilterCHANGE AlarmSubscriptionInfoFilter = "CHANGE" + AlarmSubscriptionInfoFilterCLEAR AlarmSubscriptionInfoFilter = "CLEAR" + AlarmSubscriptionInfoFilterNEW AlarmSubscriptionInfoFilter = "NEW" ) // Defines values for AlertmanagerNotificationStatus. @@ -2158,97 +2158,97 @@ func (sh *strictHandler) GetProbableCause(w http.ResponseWriter, r *http.Request // Base64 encoded, gzipped, json marshaled Swagger object var swaggerSpec = []string{ - "H4sIAAAAAAAC/+x9+XMbubH/v4KapCrrfDm8L+lbqVdaWbtWYst6kpytekvVChz0kIhnABrASGa8+t9f", - "4Zh7eEiW13Ke/YtNEtNo9PnpBjD+5AU8XnEGTEnv8JO3wgLHoECYTwGPY85+wyv6G18B03/DxyBKCPxE", - "ISJmDAEZCLpSlDPv0DvmcYyRBE1HAUERlQrxEIV6PBIQggAWgESKI0cKhYLHSC0BCZBJpNozNmMnOFhW", - "H0JUIuy+ZDiGFuIC6ck+JObnbBr9oywwMV8jGWG5BNlGP3ExY/ARx6sIWkUuNAM3AU+YEusbJJO5pcVD", - "+wt8VMAk5Uze2FkONZs3NzeamqHwm/la/i0f2XHk3LgZ+2UJDKkllSiTM6KS/UWhRAJBjLsF3NEoQnNI", - "eSNGJFbkiDoKRrLVgQhugSFqeF4jLPQvq4gGVEVrRJkblEjKFnrIjN1Ypm9yhtoz5rU8JyHv0DOSrq/J", - "a3lUK/xDAuaDHuYdemVZeC1PBkuIsTYUtV7pEVIJyhbe/X2rybzCJ7Art04rqa9kVQtQ1m70U85iEGbk", - "M8zMmdcGfexrYziKzEyWWmZAAlQimLG0z9D+47UeKRB1rV8CFsESBYIqEBQbHR5zpjBlEnEGWlUxF4Bk", - "eWCroiaIacAjzmQbGROoDDcmMGMqWUWAAktfewhmiK9AYMVFK7OR3HC0OotM3OIo0cZwtYTsORRgNmNz", - "PXidKjnkUcTv9ARWKtLo+Hf0Nn3md/QGsOHgMX9+n7Hf/exP4Z+P+KNpaXNl6kZTRm+wCpYgXYRxEglS", - "jeivjBA28oVu4MON/dRMi0oEHxIcaR/aQs7SWqhdtBYCsHYAtcRsE72UFtw8gBYXjXxaWpTt4suYTZg/", - "KTfKK9q5xgik3LrAAq1da8xpVReY07a0mDOKDbQIB4kYV6lxbODN0XJGsZkvTWmXXThaTvjbae2S/+/a", - "I6+yp0rJQj+k450mUKDjAqr7xOf/gkDVc8mMpY+68RvzCSqmk0Q2ABTfLYlJSmDGducPHWT/9gN8aAjo", - "rZP/fpGlkKtcLBpCaMJYLJJYg8RsgS5YVXk1THy4KQRAHq+wADljwRKC95k+rAb5TudvpxwZt9Ix1+o4", - "nUAimaxWXCgUJ5GiOoSngbgqRcNAOn8myhmrynJDKjb8UbUEgW5OLm+0bm/eXdYFTFmjgC9b7y5flNO0", - "E3LqIzozYtlKzUBPIFfYoBoN5xgA0cuYA5KJEDxhxJkNZYsI0IeEK5DtGdu+7iIiceZs8xC6idcoiBKp", - "QNw02o1BA3/JR/2lsp5MA1lm3ZCHjV1pPNIygMRaQYziRCoUa79FIRcWoWr7iUCZxEyoBgZ6SWZQg+3l", - "udUgm6aVUzljxZWiv2JG/lpxr0yBWkRa23vK4/9vcq+q6ncjNItbd0O0jJGcjxcb8ZnBWdvw2X36owHh", - "RxEW8cktMHXGFQ1pgC0yqwI1Mw6Zgag4Ekn9jeI6XOnxc11YtryV0N6rKJhJsH74KHjP+F0EZAFXVDNb", - "neIlVtDRPyGpcLxy8eNOy1Jjq1I4ydm+gIALgpZYojloC+WEhhRIuyTCfnfY97sTv9+76vcPB/3D/vR/", - "vJYXchFj5R16BCvwlWarVRVYq8Y+qfP+I+cRYOfviDJixMMWml+MYszwAnR0RXItFcSGXVygaEOwnqfE", - "txIJZAzN7RwZR8dLzBZAvq4we48S5ksIKTOufvqywdYK6dglCmt++WNIWE4d7nA/00D/hsUaYSl5QE1Y", - "uqNq6dzNESXoAiRPRABX6xWU14YHeNQdBRN/OiSBPwzC0J/jcOoPxuFgMB/ApBsGxbUmCSUbl1mQ6WmD", - "0eic9+7idWmNRTVYiFHmbxROxlOYHvh9DD1/OJkQ/+BgMvV7k0F3NO6NhmRE9ubvAlP5CAPabjOBQdFb", - "TKb7UJMJOJNJDOLShhjD4SZ5WjZXgt9Slzs1tymF1F5kgVKZ0QEhQa/Xn/q4NyT+cEzAn3YJ9sfD3nDe", - "O+gHcBDuI988Q5gASGxiw9F5KTDWHqstSILNpEyuIDC+iH7Q+FYqzAgWhP4byAuUh1v0w3tYyxfobkmD", - "pQV3mEZc5LK4BUa40NBjxjIMZ9pkClz7gjK7Pu1omSjxnCe25fHWP454QqwN2GTlFmItVi9kEfE5jsy4", - "JhfXqrJDUGBoUQJMZxUQ2nfpguUMX75524aSkggejg+mc+wHUzzxh/0D4mPojf3hdBj2RmF/2g9G+yiJ", - "FRKZseUrM6LKbCnfgUmBmpQDd5ozlsTe4a/dVq/Vbw2uC6x2s1kpU7Awqfmjr8f7t1iYhpN3+Kt3dvKL", - "1/KOXx2d/Xyi//H65OjCa3lHx/84e/vL65OXP5941/ctJ94LCJ8mliy5VJqFdsDjDu/TWPqUhQJLJZJA", - "JQLecEYV19Lq3PY6JmTIzjwchNOwD/44HEz84XTa9w/GmPhBd34QQDgOg+6wSdgrEAHQWyCXcAuCqrVe", - "xJ+FXoz3p07epO44dNI5rz1wb4DFHM8jOMaJhD2Tx3nxmVLSK8tjPghIL9CxH0aBP+yH4GMcjvzRQUgO", - "hjAY4Cnex6yEyy57spcO17WJwvoH67wBNmixGRl4g3kY9iYD4uMDMvKHvdHEn46moR8eDIbhOCQQDEcP", - "YVab/p4MG+PnYc74Pvwe9MiI9IZdH8JR3x/2xhN/PiaBfzAZYdKbHkzC/mA3v4bhDwkVGoT9Wokymxy6", - "MRvXVl7SWxNOqRtfPY02QLMm/NjkDaWccd0QUqs+bXZztuLshhxpShOcwtACBJUIM6u3FqLK1Y0a3ht0", - "f3Xx7qRSlmzFpkUmmvHFlS6Pshp1xVdJlMM1jHbADzNJoWCkZTi9EXz0po/Cq1uh9uNXkiPxGduExanM", - "UXilLiyu6+Bx64oAiz9MQ4GdbcsyHo4Nq25KvmY5gUw9UVneaDA+CKE79jGBuT88GI/9+TQM/eEEj8Mh", - "GQawH1jZp6I4zVGULjwZAqbEurSsAoU2mrHXPMBRtEYJox+0+qhaphg54DbMY5YhvjRDVdc4GfWDIfT7", - "fnc0ITq493VVMvcDPIB+SIZhOBo8SVXyeTbZ5Fy7ypXe5KEm+X8d+n8JoEeeFugNR+F4GII/BjL1h8Mp", - "8adhOPHD0XTY7R50cXfyn4+dGjFRPZzWNbER9TwM4tTEtw/meWNSoQV48lEAqPBrhnhK0g5xJBuBzVPY", - "9f2mNV6CuKUBHHMW0kUisi5seX1PElpeu5MVMShMsMLoPax91zfBVEjbpFY8z3ootrvVYRLlT2XVi43A", - "Nt9Lu4ymsCBA6eTE2TkIyhs0c5bEc5u4CF5Lsz9giS6pVFys3c6IAIWp7ZKbbGA5X2KJAsx0fJyDQa0R", - "v0v3cnvoB4LXLypZa1Avzqs+UuV5o4mWWlMs5HXV6TQ7x8H75tI9THQS/pDgyAb6dxenRgN552oleAAk", - "EXnEYPY7KRFG59zqtJzlio2LdilACPo5jbYCzHC7ODmfPLQtEml3BkiShb7LjU233piMwmA09AOArj8c", - "Dfr+wbQ/9vsaOk37wy705vvE402nTI7TzSrNrePOSpFws1tc2HgTsOJCGzQX2caMpZvn0WL7EM0YK++H", - "GA8wh0QEhFxAC9HQHC6J7D6QmTHLrabKwlGU8qXzdspDe8ZOldF0hYls922Ota9yVsotJX7yjWhrHjNW", - "g4HWLWwPy2Nw57W8wBQ8+h8as+sAn4dN77q6OVWLjK3uiyb9FOXWlC8b7MpC8ZLpIOP2hYahLv6iiAfp", - "LmQBqHwJO6vEiMyxm4MDCGW3f8SuDbZ8ZFmDK7yOOCYIS2QfmQNB2rjQUqmVPOx0VoLHoJaQyDblHcID", - "2cGaHGWLjo7OUnWCYmLp/OkO5kvO3/9mvzYGUM2kIOyBUaogbkq12uzzPLx3LqqJCBiRR0qP3Q9eh5Qt", - "QKwEZaouxZ/yH81BQWskawesQKgmigtgFgu/u3i9xSjtTrr+oDRBrMqozRLfFWIjPIfoMyUmFRbqQTKT", - "CqtE7sIrm4z10j7dCF7cF1gIvPays39HT2UcltzrJxCahk6C4ahRxz/i4H1E2fu8P5BLYg+lLgRPVv+A", - "dZ3wP2CdGaE7kovMaFNYGx9DP0B70dYzEyDJKtJChxcbp3kKWQgw4Vo0jn4aW8mC/bwpIb9dWcYLpwaK", - "lafLmMUfNR5MGKnFqTT6vhNRfRq3IaLHmIRiwmqmhwKDu1TcaPkiYSblHGWhsjz7K36HYszWqZ6X+Bbs", - "Fmn2aAqPZl6MP/5mx808r45LW94tCOkSx/Z8lA4smGWm1ILqW2mIf0jausxsY9/klU2c4gtd9EW3pkAM", - "qVnAdYOpv8KC3GEBhnB9vvMIB7DkEXFIYemGW1Ejd/ilYWHnTTVcQ4uJSts905DWKEpXFDxMz3NZ8FjK", - "LhkGQtKRTvF4XmKWtglbw9aoiKR6+20VHl+cXp0eH732Wt6bo7+/vdB/n56Zv385ujg7PfvZa3mnZy9P", - "rk4u3pyeHV1lG4onL80mYqlJUs/rJWHU9KytOQs7pWbRHNbc9ZDMAUgTz6x/WfmUejMEhJGVuRkScCFA", - "rjgj2jWPyi2IMoI7jnjw3gBouWbBUnBGJWWLduPuLm7qIL5KYsyQAEw0N0jBx6xwSlm0ef3RPJ5xAobP", - "M64ui1w27oruanOVW7q1LteXaGxVAkq9/2MkW24rXG9M3qV7AEfnp//MA1llpQVrsh1HjFwwy0rb89N2", - "A1gtxMYC2G932939QvlWRuV+nKb3VRwvcgfLeEWL9DO2fy2sxi3h/rqVg/BtOXm7vBvAWiLouYCQfixL", - "rvEswCnTFSMX685t79FS1aYbQfwSFKZRU1GRxZYjpQSdJwo+p9OlwxXLukqFwIUz6i1bVoWUgdn+wch1", - "29PyWeiKEjNEtXBiYCrrptQWTMyymtrASx10/CzowMdVhJlLkG46Cz2pRDwIEiHSm0lpXIqg0hM+5oxB", - "kJ6WJVjhOZaAdAVAEE8aC510+6aJxXcXp8VutS5vaLnyyTjdzCEyDYsYr9Ha7NGEiTCnqYuJgoY6smab", - "Z9Y5diFsuQF4XC0Bvbq6Onc4AwWcgOsd7BJlNqUuJJsQl6IqahSVXHKhWlWlyiSOsVhXZjJNmTY6Vfqp", - "JCL2yoTprtjEUuBR8c0ct8wtRFgps7pVIlZcggkyEQ9wRP9tzRKdhrYNRCVa0Fuzx0wQN0owrdCZZwLW", - "4TzC7P3Ma1lBZf6A5FKDZRxJ0wxLu1SlnZ5qYbHLlnAQcGHSpuLo9OTqJ3Tx0zEaHEzH6NfBdaOp1YRH", - "JQIW8ERge1gW2+0oPZHjUc5YRSGEB0nmsFlDKSVtqy5zUfLV1ZvXL+wuYckyUX6OO4Z4XmzegQSmWjNG", - "VdqA1lKUMomzRmJF0tUt00LvxlhkQYbtgMf7lCTFJJ3hbBeE6vn4vlAEv+RBgzO99S+OzhBe6WJLm6b5", - "3P7l53H7bf/0zaV/enZ1cvHT0fGJf9HtDvzb7rjd7aIf/p4wQP1uf6iL1kTXYdniSrFUtrkvMGtzsegQ", - "fscijsl/UfK38WRog5Ntm5u7EoEB/e5Q+wUQ9AqrGvW7u7u2ALLEykisHv7PT43aDffotJTOUH62zaI5", - "LTvn8d5+D+jU7rXq6brluXTnHXqDdrc90Mkfq6WReEcHGK2Czm2vE2AsfVuG2YZd2nLQiZHLhsLnwpZv", - "Mq0pjZ5K1Zfxd1deR2tXjIEsns831fUtppF2Ex0K7V6wa/Z7R3GpU2ntDKT6kZN1qiCwzTe8sv0Kylnn", - "X9IisPzmwWP6B9ZOc8tWIgG707riTNqs3+92dxzSdFUuQTIJApDSbK9ozQybHv0RE+TWqMeMmsacOrWZ", - "/j4IBEJwu1/k4n6uHLSxGjY2hhemjkvtwLvWREpmkVazqWl8Wt790+zcn+EY7ovGUVbcq7uK4op3/X/9", - "ZC+MaEvM74sUKXtVuW+7RXL9ZcyiXPc/1hZeldsBX9cayr2JLSaw4wSuxs+3hZJhAY3xQSXCXdTN7lWl", - "pYmOh1l5kuHZwraW27oymapsWj+DOoqirGJpVsKTWMCOWsyYROUOeUGriM/t7nCzBNLV6yXm+t/At8vM", - "/++z+a9UPQ1LaLa5r89Xo523K4b+M6iSaRUsPD0ssJ+Fp2fMN52FcBZfs8zG8V/QRDcf2NhqnShl6Lvp", - "Pdb0aiFWCQq3UDhw6mIYCirGkFpk8+/X9waiBcu6fZ3rrzdr/Euho60WtldKfBam3n6utm4Z6/WfB2Pn", - "AvKT0iGmEZD2N+eN5mze2lx+vaUkwVHljQnuKIdzwJI1tffz0KQh/r9bEazgu4N+d9DvDrqfg5rd8Kf0", - "zIdAu8JBLrkV05UG1krZJvnlQzo7X2t333oMjfDzHjaHE23l/BnRYK+toOZDqbUtoO9w9avB1bQkdbC1", - "Yu6Zy5W+N0nQtX7Sgy/GFHjhUKf+/OnPaRSdc7L+Uydv/rULB2ZKraQvliXrLxd5bFvn2LGennv+Izo6", - "94bZcng6NleGit71RVFGzYv3EV/vj2BiY+gwl6rcpvNzDR3D7sHz4Evn24gG3148s36AMGJw1xDHtoSx", - "xyKGzqfad6fk3npzBArqUOKl+b7iq7v74g3TbG2P7zpRc71PfCs4kFnOs3eg4fPg64wrdzr0W/Mga50I", - "PuJARWvzssE9Pai1F3Ku23rhzu60O+2R6cgfTQZjf9jvjXwc4rk/mfSH495oOIRxN33L2Fdxjq+Wvb4F", - "4Pvd+Z4Ojj/C/R6QwLaWudnRg+/17cPr2+K7UP7zStthd/A8+PqJizklBFj729ub3FxyF6ts98XD3Nph", - "0dIN+vudvv4FM3L9/UbPNiGXPPd7Ln4qvq7yU7pA6m87ucP2+KJJ19+yMxcTNnZeVfPlPXZ0vx1n/JK9", - "uaY3a3yNXaztzHzrO1nPDq+3n2UT7Pu23xfb9mPFrXnbrrOvPfsMJPTQM4kpBIsp42LzgcTsBkeM/8XF", - "xltgNYj1RpN91qcUvx88fNqDh3VD+pzjh6WrjlsL9/PyyD+i5i1f3H18wfttNmlwFFUu6BY1XVHc4/Td", - "+VS56nq/twV8GSRZv3n7PGu6imV+L+i+N1cba7Wy/251X03MULfe1HRRzd3CujTDSpfDDjsdcytyyaU6", - "nHa79g61m6z5fzbJ/4uOSiFn26Hum+YTUHpAE83Sa8KaJthy7qqB6Hn59QSFO7UbIoZsIJLpDhhZccqU", - "TN+nJZHCC/suLWn+Pyrtre7VSJQtUBBRYMrcsLXoy02Z3WK5v77/3wAAAP//e2MXQG10AAA=", + "H4sIAAAAAAAC/+x9+3MTObb/v6Lq3aod9uv2+5V8a+tWJoQhuxByk7BTdTE1kVunbS3dkpHUCV4m//st", + "PfrdfiSEIeyFX8C2+ujoPD/nSGo+ewGPV5wBU9I7/OytsMAxKBDmU8DjmLPf8Ir+xlfA9N/wKYgSAi8o", + "RMSMISADQVeKcuYdesc8jjGSoOkoICiiUiEeolCPRwJCEMACkEhx5EihUPAYqSUgATKJVHvGZuwEB8vq", + "Q4hKhN2XDMfQQlwgPdnHxPycTaN/lAUm5mskIyyXINvoBRczBp9wvIqgVeRCM3Ad8IQpsb5GMplbWjy0", + "v8AnBUxSzuS1neVQs3l9fa2pGQq/ma/l3/KRHUfOjZuxX5fAkFpSiTI5IyrZXxRKJBDEuFvALY0iNIeU", + "N2JEYkWOqKNgJFsdiOAGGKKG5zXCQv+yimhAVbRGlLlBiaRsoYfM2LVl+jpnqD1jXstzEvIOPSPp+pq8", + "lke1wj8mYD7oYd6hV5aF1/JksIQYa0NR65UeIZWgbOHd3bWazCt8BLty67SS+kZWtQBl7UY/5SwGYUa+", + "wMyceW3Qx742hqPIzGSpZQYkQCWCGUv7Au0/XOuRAlHX+iVgESxRIKgCQbHR4TFnClMmEWegVRVzAUiW", + "B7YqaoKYBjziTLaRMYHKcGMCM6aSVQQosPS1h2CG+AoEVly0MhvJDUers8jEDY4SbQxXS8ieQwFmMzbX", + "g9epkkMeRfxWT2ClIo2Of0dv0md+R68BGw4e8uf3Gfvdz/4U/vmAP5qWNlemrjVl9BqrYAnSRRgnkSDV", + "iP7KCGEjX+gaPl7bT820qETwMcGR9qEt5CythdpFayEAawdQS8w20UtpwfU9aHHRyKelRdkuvozZhPmT", + "cqO8op1rjEDKrQss0Nq1xpxWdYE5bUuLOaPYQItwkIhxlRrHBt4cLWcUm/nSlHbZhaPlhL+d1i75/649", + "8ip7qpQs9EM63mkCBTouoLpPfP4vCFQ9l8xY+qgbvzGfoGI6SWQDQPHdkpikBGZsd/7QQfZvP8HHhoDe", + "OvnvZ1kKucrFoiGEJozFIok1SMwW6IJVlVfDxMfrQgDk8QoLkDMWLCH4kOnDapDvdP52ypFxKx1zrY7T", + "CSSSyWrFhUJxEimqQ3gaiKtSNAyk82einLGqLDekYsMfVUsQ6Prk8lrr9vrtZV3AlDUK+LL19vJZOU07", + "Iac+ojMjlq3UDPQEcoUNqtFwjgEQvYw5IJkIwRNGnNlQtogAfUy4Atmese3rLiISZ842D6HreI2CKJEK", + "xHWj3Rg08Jd81F8q68k0kGXWDXnY2JXGIy0DSKwVxChOpEKx9lsUcmERqrafCJRJzIRqYKCXZAY12F6e", + "Ww2yaVo5lTNWXCn6K2bkrxX3yhSoRaS1vac8/v8m96qqfjdCs7h1N0TLGMn5eLYRnxmctQ2f3aU/GhB+", + "FGERn9wAU2dc0ZAG2CKzKlAz45AZiIojkdTfKK7DlR4/14Vly1sJ7b2KgpkE64ePgg+M30ZAFnBFNbPV", + "KZ5jBR39E5IKxysXP261LDW2KoWTnO0LCLggaIklmoO2UE5oSIG0SyLsd4d9vzvx+72rfv9w0D/sT//H", + "a3khFzFW3qFHsAJfabZaVYG1auyTOu8/cx4Bdv6OKCNGPGyh+cUoxgwvQEdXJNdSQWzYxQWKNgTreUp8", + "K5FAxtDczpFxdLzEbAHk2wqz9yBhPoeQMuPqp88bbK2Qjl2isOaXP4aE5dThDvczDfRvWKwRlpIH1ISl", + "W6qWzt0cUYIuQPJEBHC1XkF5bXiAR91RMPGnQxL4wyAM/TkOp/5gHA4G8wFMumFQXGuSULJxmQWZnjYY", + "jc55by9eldZYVIOFGGX+RuFkPIXpgd/H0POHkwnxDw4mU783GXRH495oSEZkb/4uMJUPMKDtNhMYFL3F", + "ZLr3NZmAM5nEIC5tiDEcbpKnZXMl+A11uVNzm1JI7UUWKJUZHRAS9Hr9qY97Q+IPxwT8aZdgfzzsDee9", + "g34AB+E+8s0zhAmAxCY2HJ2XAmPtsdqCJNhMyuQKAuOL6CeNb6XCjGBB6L+BPEN5uEU/fYC1fIZulzRY", + "WnCHacRFLosbYIQLDT1mLMNwpk2mwLUvKLPr046WiRLPeWJbHm/844gnxNqATVZuIdZi9UIWEZ/jyIxr", + "cnGtKjsEBYYWJcB0VgGhfZcuWM7w5es3bSgpieDh+GA6x34wxRN/2D8gPobe2B9Oh2FvFPan/WC0j5JY", + "IZEZW74yI6rMlvIdmBSoSTlwpzljSewdvuu2eq1+a/C+wGo3m5UyBQuTmj/5erx/g4VpOHmH77yzk1+9", + "lnf88ujslxP9j1cnRxdeyzs6/sfZm19fnTz/5cR7f9dy4r2A8HFiyZJLpVloBzzu8D6NpU9ZKLBUIglU", + "IuA1Z1RxLa3OTa9jQobszMNBOA374I/DwcQfTqd9/2CMiR905wcBhOMw6A6bhL0CEQC9AXIJNyCoWutF", + "/FnoxXh/6uRN6o5DJ53z2gN3BljM8TyCY5xI2DN5nBefKSW9sjzmg4D0Ah37YRT4w34IPsbhyB8dhORg", + "CIMBnuJ9zEq47LIne+lwXZsorH+wzhtggxabkYE3mIdhbzIgPj4gI3/YG0386Wga+uHBYBiOQwLBcHQf", + "ZrXp78mwMX4e5ozvw+9Bj4xIb9j1IRz1/WFvPPHnYxL4B5MRJr3pwSTsD3bzaxj+mFChQdi7SpTZ5NCN", + "2bi28pLemnBK3fjqabQBmjXhxyZvKOWM9w0hterTZjdnK85uyJGmNMEpDC1AUIkws3prIapc3ajhvUH3", + "VxdvTyplyVZsWmSiGV9c6fIoq1FXfJVEOVzDaAf8MJMUCkZahtMbwUdv+iC8uhVqP3wlORKfsU1YnMoc", + "hVfqwuK6Dh62rgiw+MM0FNjZtizj/tiw6qbkW5YTyNQTleWNBuODELpjHxOY+8OD8difT8PQH07wOByS", + "YQD7gZV9KorTHEXpwpMhYEqsS8sqUGijGXvFAxxFa5Qw+lGrj6plipEDbsM8ZhniSzNUdY2TUT8YQr/v", + "d0cTooN7X1clcz/AA+iHZBiGo8GjVCVfZpNNzrWrXOlN7muS/9eh/9cAeuRxgd5wFI6HIfhjIFN/OJwS", + "fxqGEz8cTYfd7kEXdyf/+dipERPVw2ldExtRz/0gTk18+2Ce1yYVWoAnHwSACr9miKck7RBHshHYPIZd", + "321a4yWIGxrAMWchXSQi68KW1/cooeWVO1kRg8IEK4w+wNp3fRNMhbRNasXzrIdiu1sdJlH+VFa92Ahs", + "8720y2gKCwKUTk6cnYOgvEEzZ0k8t4mL4LU0+wOW6JJKxcXa7YwIUJjaLrnJBpbzJZYowEzHxzkY1Brx", + "23Qvt4d+Inj9rJK1BvXivOojVZ43mmipNcVCXledTrNzHHxoLt3DRCfhjwmObKB/e3FqNJB3rlaCB0AS", + "kUcMZr+TEmF0zq1Oy1mu2LholwKEoF/SaCvADLeLk/PJQ9sikXZngCRZ6Lvc2HTrjckoDEZDPwDo+sPR", + "oO8fTPtjv6+h07Q/7EJvvk883nTK5DjdrNLcOu6sFAk3u8WFjTcBKy60QXORbcxYunkeLbYP0Yyx8n6I", + "8QBzSERAyAW0EA3N4ZLI7gOZGbPcaqosHEUpXzpvpzy0Z+xUGU1XmMh23+ZY+ypnpdxS4iffiLbmMWM1", + "GGjdwvaw9mpFFXVnx9dUURRRU2psMCGLuktWgoyHF3qDus6LIh6kG44FTPI1TKoSDjIfbo4DIJTd6RG7", + "9tLykWVlrfA64pggLJF9ZA4EaTtCS6VW8rDTWQkeg1pCItuUdwgPZAdrcpQtOjoQS9UJijmk86dbmC85", + "//Cb/droupo0QdizoVRB3JRVtYXnKXfvtFMTETAij5Qeux+SDilbgFgJylRdii/yH82ZQGska4ehQKgm", + "igtgFva+vXi1xSjtprn+oDRBrMoAzRLfFU0jPIfoCyUmFRbqXjKTCqtE7oImm4z10j7diFPcF1gIvPay", + "Y35Hj2UcltyrRxCaRkmC4ahRxz/j4ENE2Ye8FZBLYg+lLgRPVv+AdZ3wP2CdGaE7fYvMaFNDGx9DP0F7", + "0dYzEyDJKtJCh2cbp3kMWQgwEFQ0jn4cW8mC/bwp975ZWcYLBwSKRaZLjsUfNfRLGKnFqTT6vhVRfRq3", + "96HHmIRiwmqmhwKDu1TcaPkiYSblHGWhsjz7S36LYszWqZ6X+Absbmj2aIqEZl6MP/1mx808rw5BW94N", + "COkSx/Z8lA4smGWm1ILqW2mIv0/ausxsY9/klU2cQgld30U3phYMqVnA+wZTf4kFucUCDOH6fOcRDmDJ", + "I+KQwtINt6JG7pxLw8LOm8q1hm4SlbZRptGrUZQuHniYHt2yOLGUXbJCEElHOoXeeTVZ2hFsDVujImjq", + "7bcreHxxenV6fPTKa3mvj/7+RsOw16dn5u9fjy7OTs9+8Vre6dnzk6uTi9enZ0dXGWA7eW72C0v9kHpe", + "LwmjpmdtzVnYKfWF5rDmrl1kzjqaeGb9y8qn1IYhIIyszCWQgAsBcsUZ0a55VO42lBHcccSDDwYryzUL", + "loIzKilbtBs3cnFTs/BlEmOGBGCiuUEKPmU1UsqizesP5vGMEzB8nnF1WeSycQN0V0er3L2tNbS+Rg+r", + "ElDqrR4j2XIH4f3G5F068n90fvrPPJBVVlqwJttcxMgFs6yKPT9tN4DVQmwsgP12t93dL5RvZVTux2l6", + "NcXxInewjFe0SD9j+11hNW4Jd+9bOQjflpO3y7sBrCWCngsI6aey5Bq3/U+ZLg65WHdueg+WqjbdCOLn", + "oDCNmoqKLLYcKSXoPFHwJU0tHa5Y1kAqBC6cUW/ZsiqkDMxOD0ausZ5WykJXlJghqoUTA1NZ46S2YGKW", + "1dTxXeqg42dBBz6tIsxcgnTTWehJJeJBkAiRXkJK41IElfbvMWcMgvRgLMEK67If6QqAIJ40FjrpTk0T", + "i28vTouNaV3e0HLlk3G6mUNkehMxXqO12Y4JE2EOThcTBQ11ZM32yaxz7ELYcgPwuFoCenl1de5wBgo4", + "Adc72CXKbEpdSDYhLkVV1CgqueRCtapKlUkcY7GuzGT6L210qvRTSUTs7Qizc2wTS4FHxTdz3DIXDmGl", + "zOpWiVhxCSbIRDzAEf23NUt0GtqOD5VoQW/MdjJB3CjBdD1nnglYh/MIsw8zr2UFlfkDkksNlnEkTd8r", + "bUiVNnWqhcUuW8JBwIVJm4qj05OrF+jixTEaHEzH6N3gfaOp1YRHJQIW8ERgey4W250nPZHjUc5YRSGE", + "B0nmsFlDKSVtqy5zJ/Ll1etXz+yGYMkyUX5kO4Z4XuzTgQSmWjNGVdpr1lKUMomznmFF0tXd0ULvxlhk", + "QYbtgMf7lCTFJJ3hbBeE6vn4rlAEP+dBgzO98S+OzhBe6WJLm6b53P71l3H7Tf/09aV/enZ1cvHi6PjE", + "v+h2B/5Nd9zudtFPf08YoH63P9RFa6LrsGxxpVgq29wXmLW5WHQIv2URx+S/KPnbeDK0wcl2yM21iMCA", + "fnd+/QIIeolVjfrt7W1bAFliZSRWD//np0bthnt0WkpnKD/GZtGclp3zeG+/B3Rq91r1dN3yXLrzDr1B", + "u9se6OSP1dJIvKMDjFZB56bXCTCWvi3DbMMubTnoxMhlQ+FzYcs3mdaURk+l6sv4uyuvo7UrxkAWj+Kb", + "6voG00i7iQ6FdtvX9fW9o7jUqbR2BlL9zMk6VRDY5hte2X4F5azzL2kRWH7J4CH9A2unuWUrkYDdVF1x", + "Jm3W73e7O85juiqXIJkEAUhpdlK0ZoZNj/6MCXJr1GNGTWNOndpMKx8EAiG43RpycT9XDtpYDRsbwwtT", + "x6V24L3XREpmkVazqWl8Xt7+02zSn+EY7orGUVbcy9uK4orX+t99tndDtCXmV0OKlL2q3LddGHn/dcyi", + "XPc/1BZeltsB39Yayr2JLSaw47Ctxs83hZJhAY3xQSXC3cnNrlClpYmOh1l5kuHZwg6W26UymapsWr+A", + "OoqirGJpVsKjWMCOWsyYROW6eEGriM/tRnCzBNLV6yXm+t/At8vM/++L+a9UPQ1LaLa5b89Xo523K4b+", + "C6iSaRUsPD0XsJ+Fp8fJNx17cBZfs8zG8V/RRDefzdhqnShl6IfpPdT0aiFWCQo3UDhb6mIYCirGkFqk", + "bFLb+zuD0IJl3bzO9debFf61wNFWA9srIz4JS28/VVO3jPX6T4OxcwH5megQ0whI+7tzRnMKb22uud5Q", + "kuCo8m4Ed5LD+WfJmtp7OWjSEP3frghW8MM/f/jnD//czz/NXvgjOuZ9cF3hFJfcCuhKA2t1bJP48iGd", + "na+vu2s9hEb4ZQ+bQ4i2bP6CYLDXPlDz4dPa/s8PrPrNsGpajzrMWjH3zONK35sc6Po+6akXYwq8cHhT", + "f/785zSIzjlZ/6mTd/7ahdMypT7SV0uS9ZeIPLSnc+xYT883/xHtnDvDbDk8HZurQUXv+qogo+bF+4iv", + "90cwsTF0mMtTbsf5qYaOYffgafCl02lEg+8vnlk/QBgxuG2IY1vC2EMRQ+dz7btTcme9OQIFdSjx3Hxf", + "8dXdTfGGabb2xncdp3m/T3wrOJBZzpN3oOHT4OuMK3c09HvzIGudCD7hQEVr81LBPT2otRdyrtt64W7u", + "tDvtkenIH00GY3/Y7418HOK5P5n0h+PeaDiEcTd9m9g3cY5vlr2+B+D7w/keD44/wP3ukcC2lrnZuYMf", + "9e3969viO0/+80rbYXfwNPh6wcWcEgKs/f1tTG4uuYtVtvvifm7tsGjppvzdTl//ihm5/h6jJ5uQS577", + "Ixc/Fl9X+RFdIPW3mtxie3bRpOvv2ZmLCRs7r6r58h77ud+PM37N3lzTGzS+xSbWdma+942sJ4fX20+y", + "CfZj1++r7fqx4sa8bdfZ15t9ARK674HEFILFlHGx+TRidn0jxv/iYuMVsBrEeq3JPukjij9OHT7uqcO6", + "IX3J2cPSPcethft5eeQfUfOWb+0+vOD9Pps0OIoqt3OLmq4o7mH67nyu3HO929sCvg6SrF+7fZo1XcUy", + "fxR0P5qrjbVa2X+3uq8mZqhbb2q6peauYF2aYaWbYYedjrkSueRSHU67XXuB2k3W/D+Y5P8VR6WQs+3Q", + "pkcaj0vlTzceltpEq/TmsCZeyg3oOpnz8osLCrdtN4STJiKZYoGRFadMyfRNWxIpvLBv2ZLmP6XSruxe", + "mkTZAgURBabM3VsLzdyU2f2Wu/d3/xsAAP//8r7VBnJ0AAA=", } // GetSwagger returns the content of the embedded swagger specification file diff --git a/internal/service/alarms/api/openapi.yaml b/internal/service/alarms/api/openapi.yaml index 2e167d93..f31db739 100644 --- a/internal/service/alarms/api/openapi.yaml +++ b/internal/service/alarms/api/openapi.yaml @@ -18,8 +18,9 @@ servers: tags: - name: alarms description: Alarm management - - name: service configuration - - name: Alarm Service Configuration + - name: serviceConfiguration + description: Alarm Service Configuration + - name: subscriptions description: Alarm subscription management - name: probableCauses description: Probable cause information @@ -227,7 +228,7 @@ paths: operationId: GetServiceConfiguration summary: Retrieve the alarm service configuration tags: - - service configuration + - serviceConfiguration responses: '200': description: Successful response @@ -252,7 +253,7 @@ paths: operationId: UpdateAlarmServiceConfiguration summary: Modify all fields of the Alarm Service Configuration. tags: - - service configuration + - serviceConfiguration requestBody: required: true content: @@ -289,7 +290,7 @@ paths: operationId: PatchAlarmServiceConfiguration summary: Modify individual fields of the Alarm Service Configuration. tags: - - service configuration + - serviceConfiguration requestBody: required: true content: @@ -709,13 +710,13 @@ components: example: 16d5fc54-cee0-4532-9826-2369f8240e1b filter: type: string - enum: [ new, change, clear, acknowledge ] + enum: [ NEW, CHANGE, CLEAR, ACKNOWLEDGE ] description: | Criteria for events which do not need to be reported or will be filtered by the subscription notification service. Therefore, if a filter is not provided then all events are reported. It can be filtered by criteria based on the type of notification of fields of the AlarmEventRecord. - example: (eq,perceivedSeverity,0) + example: NEW callback: type: string format: uri diff --git a/internal/service/alarms/api/server.go b/internal/service/alarms/api/server.go index c1b6e511..caee1053 100644 --- a/internal/service/alarms/api/server.go +++ b/internal/service/alarms/api/server.go @@ -7,8 +7,11 @@ import ( "log/slog" "net/http" "strings" + "sync" "time" + "github.com/openshift-kni/oran-o2ims/internal/service/common/notifier" + "github.com/google/uuid" api "github.com/openshift-kni/oran-o2ims/internal/service/alarms/api/generated" @@ -40,6 +43,10 @@ type AlarmsServer struct { AlarmsRepository *repo.AlarmsRepository // ClusterServer contains the cluster server client and fetched objects ClusterServer *clusterserver.ClusterServer + // HttpClient HTTP client capable of acquiring an OAuth token used to authorize client requests + HttpClient *http.Client + // Logger used when global logger may not be available + Logger *slog.Logger } // AlarmsServer implements StrictServerInterface. This ensures that we've conformed to the `StrictServerInterface` with a compile-time check @@ -50,7 +57,7 @@ var baseURL = "/o2ims-infrastructureMonitoring/v1" var currentVersion = "1.0.0" // GetAllVersions receives the API request to this endpoint, executes the request, and responds appropriately -func (r *AlarmsServer) GetAllVersions(ctx context.Context, request api.GetAllVersionsRequestObject) (api.GetAllVersionsResponseObject, error) { +func (a *AlarmsServer) GetAllVersions(ctx context.Context, request api.GetAllVersionsRequestObject) (api.GetAllVersionsResponseObject, error) { // We currently only support a single version versions := []common.APIVersion{ { @@ -65,7 +72,7 @@ func (r *AlarmsServer) GetAllVersions(ctx context.Context, request api.GetAllVer } // GetMinorVersions receives the API request to this endpoint, executes the request, and responds appropriately -func (r *AlarmsServer) GetMinorVersions(ctx context.Context, request api.GetMinorVersionsRequestObject) (api.GetMinorVersionsResponseObject, error) { +func (a *AlarmsServer) GetMinorVersions(ctx context.Context, request api.GetMinorVersionsRequestObject) (api.GetMinorVersionsResponseObject, error) { // We currently only support a single version versions := []common.APIVersion{ { @@ -137,6 +144,7 @@ func (a *AlarmsServer) CreateSubscription(ctx context.Context, request api.Creat }), nil } + slog.Info("Successfully created Alarm Subscription", "record", record) return api.CreateSubscription201JSONResponse(models.ConvertSubscriptionModelToApi(*record)), nil } @@ -479,6 +487,8 @@ func (a *AlarmsServer) GetProbableCause(ctx context.Context, request api.GetProb // AmNotification handles an API request coming from AlertManager with CaaS alerts. This api is used internally. // Note: the errors returned can also be view under alertmanager pod logs but also logging here for convenience func (a *AlarmsServer) AmNotification(ctx context.Context, request api.AmNotificationRequestObject) (api.AmNotificationResponseObject, error) { + // TODO: AM auto retries if it receives 5xx error code. That means any error, even if permanent (e.g postgres syntax or host not found), will be processed the same way. Once we have a better retry mechanism for pg, update all 5xx to 4xx as needed. + // Audit the table with full list of alerts in the current payload. If missing set them to resolve if err := a.AlarmsRepository.ResolveNotificationIfNotInCurrent(ctx, request.Body); err != nil { msg := "failed to resolve notification that are not present" @@ -504,12 +514,112 @@ func (a *AlarmsServer) AmNotification(ctx context.Context, request api.AmNotific return nil, fmt.Errorf("%s: %w", msg, err) } - //TODO: Get subscriber + // Get all subscriptions + subscriptions, err := a.AlarmsRepository.GetAlarmSubscriptions(ctx) + if err != nil { + msg := "failed to get all subscriptions" + slog.Error(msg, "error", err) + return nil, fmt.Errorf("%s: %w", msg, err) + } + // No subscription, noop + if len(subscriptions) == 0 { + slog.Info("No subscriptions to notify") + return api.AmNotification200Response{}, nil + } + + // Notify subscriber + slog.Info("Processing subscriptions") + if err := a.processSubscriptions(ctx, subscriptions); err != nil { + msg := "failed to process at least one subscription" + slog.Error(msg, "error", err) + return nil, fmt.Errorf("%s: %w", msg, err) + } - //TODO: Notify subscriber + slog.Info("Successfully handled all alertmanager alerts and notified subscriptions") return api.AmNotification200Response{}, nil } +// Max number of subscriptions that can be notified at the same time +const subsMaxConcurrent = 5 + +// processSubscriptions Process a list of oran subscription concurrently +func (a *AlarmsServer) processSubscriptions(ctx context.Context, subscriptions []models.AlarmSubscription) error { + maxConcurrent := subsMaxConcurrent + sem := make(chan struct{}, maxConcurrent) + errChan := make(chan struct { + err error + subID string + }, len(subscriptions)) + + var wg sync.WaitGroup + + for _, subscription := range subscriptions { + wg.Add(1) + sem <- struct{}{} // Acquire semaphore + go func(sub models.AlarmSubscription) { + defer wg.Done() + defer func() { <-sem }() // Release semaphore + + // Notify + if err := a.notifySubscription(ctx, sub); err != nil { + errChan <- struct { + err error + subID string + }{err, sub.SubscriptionID.String()} + } + }(subscription) + } + + go func() { + wg.Wait() + close(errChan) + }() + + var failedNotifications []string + errCount := 0 + for e := range errChan { + errCount++ + failedNotifications = append(failedNotifications, fmt.Sprintf("subscription %s: %v", e.subID, e.err)) + } + + if errCount > 0 { + return fmt.Errorf("failed to notify %d subscriptions: %s", errCount, strings.Join(failedNotifications, "::")) + } + + slog.Info("Successfully processed all subscriptions", "count", len(subscriptions)) + return nil +} + +// notifySubscription For a given subscription, collect all the latest alarms (with possible filter) and update its event cursor +func (a *AlarmsServer) notifySubscription(ctx context.Context, subscription models.AlarmSubscription) error { + subAlarms, err := a.AlarmsRepository.GetAlarmsForSubscription(ctx, subscription) + if err != nil { + return fmt.Errorf("get alarms failed for %s: %w", subscription.SubscriptionID, err) + } + + if len(subAlarms) > 0 { + // Call the subscription url with notification payload + notifyPayload := notifier.Notification{ + NotificationID: subscription.SubscriptionID, + Payload: models.ConvertAlarmEventRecordModelsToAlarmEventNotifications(subAlarms, subscription, a.GlobalCloudID), + } + if err := notifier.CallUrl(ctx, a.Logger, a.HttpClient, subscription.Callback, notifyPayload); err != nil { + return fmt.Errorf("call url failed for %s: %w", subscription.SubscriptionID, err) + } + + // Update the new event cursor with the latest + latestSequence := subAlarms[len(subAlarms)-1].AlarmSequenceNumber + subscription.EventCursor = subAlarms[0].AlarmSequenceNumber + if err := a.AlarmsRepository.UpdateSubscriptionEventCursor(ctx, subscription, latestSequence); err != nil { + return fmt.Errorf("update subscription failed for %s: %w", subscription.SubscriptionID, err) + } + + slog.Info("Subscription have been notified", "subscription", subscription.SubscriptionID) + } + + return nil +} + func (a *AlarmsServer) HwNotification(ctx context.Context, request api.HwNotificationRequestObject) (api.HwNotificationResponseObject, error) { // TODO implement me return nil, fmt.Errorf("not implemented") diff --git a/internal/service/alarms/internal/alertmanager/alertmanager.go b/internal/service/alarms/internal/alertmanager/alertmanager.go index 408eaefc..cb5b8077 100644 --- a/internal/service/alarms/internal/alertmanager/alertmanager.go +++ b/internal/service/alarms/internal/alertmanager/alertmanager.go @@ -74,7 +74,6 @@ func Setup(ctx context.Context, cl client.Client) error { func ConvertAmToAlarmEventRecordModels(am *api.AlertmanagerNotification, aDefinitionRecords []models.AlarmDefinition, clusterIDToObjectTypeID map[uuid.UUID]uuid.UUID) []models.AlarmEventRecord { records := make([]models.AlarmEventRecord, 0, len(am.Alerts)) for _, alert := range am.Alerts { - slog.Info("Converting Alertmanager alert", "alert name", GetAlertName(*alert.Labels)) record := models.AlarmEventRecord{ AlarmRaisedTime: *alert.StartsAt, AlarmClearedTime: setTime(*alert.EndsAt), diff --git a/internal/service/alarms/internal/db/migrations/000003_create_alarm_event_record_table.down.sql b/internal/service/alarms/internal/db/migrations/000003_create_alarm_event_record_table.down.sql index d97db248..5024bdb5 100644 --- a/internal/service/alarms/internal/db/migrations/000003_create_alarm_event_record_table.down.sql +++ b/internal/service/alarms/internal/db/migrations/000003_create_alarm_event_record_table.down.sql @@ -1,18 +1,12 @@ --- Drop the trigger for updating timestamp updated_at in alarm_event_record -DROP TRIGGER IF EXISTS track_alarm_event_record_change_time ON alarm_event_record; +-- Drop the trigger for managing events +DROP TRIGGER IF EXISTS manage_alarm_event ON alarm_event_record; --- Drop the function for updating timestamp updated_at in alarm_event_record -DROP FUNCTION IF EXISTS track_alarm_event_record_change_time; +-- Drop the function for managing events +DROP FUNCTION IF EXISTS manage_alarm_event; -- Remove the default for alarm_sequence_number that uses alarm_sequence_seq ALTER TABLE alarm_event_record ALTER COLUMN alarm_sequence_number DROP DEFAULT; --- Drop the trigger for updating alarm_sequence_number in alarm_event_record -DROP TRIGGER IF EXISTS update_alarm_event_sequence ON alarm_event_record; - --- Drop the trigger function for updating alarm_sequence_number -DROP FUNCTION IF EXISTS update_alarm_event_sequence; - -- Drop the sequence for alarm_sequence_number DROP SEQUENCE IF EXISTS alarm_sequence_seq; diff --git a/internal/service/alarms/internal/db/migrations/000003_create_alarm_event_record_table.up.sql b/internal/service/alarms/internal/db/migrations/000003_create_alarm_event_record_table.up.sql index 78a3c92d..2e428039 100644 --- a/internal/service/alarms/internal/db/migrations/000003_create_alarm_event_record_table.up.sql +++ b/internal/service/alarms/internal/db/migrations/000003_create_alarm_event_record_table.up.sql @@ -23,6 +23,7 @@ CREATE TABLE IF NOT EXISTS alarm_event_record ( -- O-RAN additional data to create AlarmEventNotification object_id UUID, -- Same as manager_cluster_id for caas alerts. Note: nullable to capture if ACM is sending us cluster ID object_type_id UUID, -- Derived from manager_cluster_id. Note: nullable to capture if ACM is sending us cluster ID + notification_event_type VARCHAR(20) DEFAULT 'NEW' NOT NULL, -- Same as alarm_subscription_info.filter used to quickly filter and return notification -- Internal alarm_status VARCHAR(20) DEFAULT 'firing' NOT NULL, -- Status of the alarm (either 'firing' or 'resolved'). This is also used to archive it later. @@ -32,64 +33,102 @@ CREATE TABLE IF NOT EXISTS alarm_event_record ( CONSTRAINT unique_fingerprint_alarm_raised_time UNIQUE (fingerprint, alarm_raised_time), -- Unique constraint to prevent duplicate caas alert with the same fingerprint and time CONSTRAINT chk_status CHECK (alarm_status IN ('firing', 'resolved')), -- Check constraint to enforce status as either 'firing' or 'resolved' - CONSTRAINT chk_perceived_severity CHECK (perceived_severity IN (0, 1, 2, 3, 4, 5)) -- Check constraint to restrict perceived_severity to valid integer values. See generated ENUMs in server for more. + CONSTRAINT chk_perceived_severity CHECK (perceived_severity IN (0, 1, 2, 3, 4, 5)), -- Check constraint to restrict perceived_severity to valid integer values. See generated ENUMs in server for more. + CONSTRAINT chk_notification_event_type CHECK (notification_event_type IN ('NEW', 'CHANGE', 'CLEAR', 'ACKNOWLEDGE')) -- Validate notification_event_type (same as alarm_subscription_info.filter) ); -- Set ownership of the alarm_sequence_seq sequence to alarm_event_record.alarm_sequence_number ALTER SEQUENCE alarm_sequence_seq OWNED BY alarm_event_record.alarm_sequence_number; --- Function to update the alarm_sequence_number on specific status or time changes -CREATE OR REPLACE FUNCTION update_alarm_event_sequence() - RETURNS TRIGGER AS $$ -BEGIN - -- Update sequence if status changes to 'resolved' or if alarm_changed_time is updated - IF (NEW.alarm_status = 'resolved' AND OLD.alarm_status IS DISTINCT FROM 'resolved') - OR (NEW.alarm_changed_time IS DISTINCT FROM OLD.alarm_changed_time) THEN - NEW.alarm_sequence_number := nextval('alarm_sequence_seq'); - END IF; - RETURN NEW; -END; -$$ LANGUAGE plpgsql; +/* +Manages alarm lifecycle by handling state transitions, notifications, and change tracking. --- Trigger to execute update_alarm_event_sequence before updating alarm_event_record -CREATE OR REPLACE TRIGGER update_alarm_event_sequence - BEFORE UPDATE ON alarm_event_record - FOR EACH ROW - EXECUTE FUNCTION update_alarm_event_sequence(); +The trigger manage_alarm_event is called `BEFORE INSERT OR UPDATE` to update +alarm_changed_time, notification_event_type, alarm_sequence_number + +For new alarms (INSERT): +- Sets alarm_changed_time to alarm_raised_time +- Sets CLEAR notification if initially resolved, anything else is treated as NEW by default +- Uses auto-incremented alarm_sequence_number + +State transition priority (UPDATE): +1. Alarm Status Change - Sets CLEAR notification when resolved +2. Acknowledgment - Sets ACKNOWLEDGE notification +3. Other changes - Sets CHANGE notification (only if not acknowledged) --- Function to track alarm_changed_time -CREATE OR REPLACE FUNCTION track_alarm_event_record_change_time() - RETURNS TRIGGER AS $$ +alarm_sequence_number incremented when any of these changes occur: +- Alarm status changes to resolved +- First acknowledgment +- Changes to key attributes (if not acknowledged) + +alarm_changed_time updates: +- On change to resolved status from non-resolved +- On Changes to key attributes (if not acknowledged) +- NOT updated on acknowledgment +*/ +CREATE OR REPLACE FUNCTION manage_alarm_event() +RETURNS TRIGGER AS $$ BEGIN - -- Set initial change time if not set - IF NEW.alarm_changed_time IS NULL THEN - NEW.alarm_changed_time = NEW.alarm_raised_time; - RETURN NEW; - END IF; + -- Handle new alarms + IF TG_OP = 'INSERT' THEN + NEW.alarm_changed_time := NEW.alarm_raised_time; + -- Set CLEAR if alarm is initially resolved + IF NEW.alarm_status = 'resolved' THEN + NEW.notification_event_type := 'CLEAR'; + END IF; - -- Skip update if alarm is acknowledged - IF NEW.alarm_acknowledged THEN + -- alarm_sequence_number is auto-incremented RETURN NEW; - END IF; - -- Update change time if relevant fields changed - IF (NEW.alarm_status IS DISTINCT FROM OLD.alarm_status OR - NEW.alarm_cleared_time IS DISTINCT FROM OLD.alarm_cleared_time OR - NEW.perceived_severity IS DISTINCT FROM OLD.perceived_severity OR - NEW.object_id IS DISTINCT FROM OLD.object_id OR - NEW.object_type_id IS DISTINCT FROM OLD.object_type_id OR - NEW.alarm_definition_id IS DISTINCT FROM OLD.alarm_definition_id OR - NEW.probable_cause_id IS DISTINCT FROM OLD.probable_cause_id) - THEN - NEW.alarm_changed_time = CURRENT_TIMESTAMP; + -- Handle updates to existing alarms + ELSIF TG_OP = 'UPDATE' THEN + -- 1. Alarm status handling (highest priority) + IF NEW.alarm_status = 'resolved' THEN + NEW.notification_event_type := 'CLEAR'; + + -- Only update change time and sequence number if transitioning from a non-resolved state + IF OLD.alarm_status IS DISTINCT FROM 'resolved' THEN + NEW.alarm_changed_time := CURRENT_TIMESTAMP; + NEW.alarm_sequence_number := nextval('alarm_sequence_seq'); + END IF; + + -- 2. Acknowledgment handling + ELSIF NEW.alarm_acknowledged THEN + NEW.notification_event_type := 'ACKNOWLEDGE'; + + -- Update sequence only on first acknowledgment + IF NEW.alarm_acknowledged IS DISTINCT FROM OLD.alarm_acknowledged THEN + NEW.alarm_sequence_number := nextval('alarm_sequence_seq'); + -- Do not update alarm_changed_time + END IF; + + -- 3. Other changes (only if not acknowledged) + ELSIF NOT NEW.alarm_acknowledged THEN + IF (NEW.alarm_status IS DISTINCT FROM OLD.alarm_status OR + NEW.alarm_cleared_time IS DISTINCT FROM OLD.alarm_cleared_time OR + NEW.perceived_severity IS DISTINCT FROM OLD.perceived_severity OR + NEW.object_id IS DISTINCT FROM OLD.object_id OR + NEW.object_type_id IS DISTINCT FROM OLD.object_type_id OR + NEW.alarm_definition_id IS DISTINCT FROM OLD.alarm_definition_id OR + NEW.probable_cause_id IS DISTINCT FROM OLD.probable_cause_id) + THEN + NEW.notification_event_type := 'CHANGE'; + NEW.alarm_changed_time := CURRENT_TIMESTAMP; + NEW.alarm_sequence_number := nextval('alarm_sequence_seq'); + END IF; + END IF; + + RETURN NEW; END IF; RETURN NEW; END; $$ LANGUAGE plpgsql; --- Trigger for updating alarm_changed_time on INSERT or UPDATE -CREATE OR REPLACE TRIGGER track_alarm_event_record_change_time - BEFORE INSERT OR UPDATE ON alarm_event_record + +-- Create a trigger for managing alarm events on INSERT or UPDATE +CREATE TRIGGER manage_alarm_event + BEFORE INSERT OR UPDATE + ON alarm_event_record FOR EACH ROW - EXECUTE FUNCTION track_alarm_event_record_change_time(); + EXECUTE FUNCTION manage_alarm_event(); diff --git a/internal/service/alarms/internal/db/migrations/000004_create_alarm_subscription_info_table.up.sql b/internal/service/alarms/internal/db/migrations/000004_create_alarm_subscription_info_table.up.sql index eea2cadb..48052e42 100644 --- a/internal/service/alarms/internal/db/migrations/000004_create_alarm_subscription_info_table.up.sql +++ b/internal/service/alarms/internal/db/migrations/000004_create_alarm_subscription_info_table.up.sql @@ -7,7 +7,7 @@ CREATE TABLE IF NOT EXISTS alarm_subscription_info ( consumer_subscription_id UUID NULL, -- Optional ID for the consumer's subscription identifier -- filter set nullable as false, but description says that if not set, all alarms are included -- setting as NULL based on description and model definition in O-RAN.WG6.O2IMS-INTERFACE-R004-v07.00 - filter VARCHAR(20) NULL, -- Can be [new, change, clear, acknowledge], NULL means all + filter VARCHAR(20) NULL, -- Can be ['NEW', 'CHANGE', 'CLEAR', 'ACKNOWLEDGE'], NULL means all callback TEXT NOT NULL, -- URL or endpoint for sending notifications -- Internal @@ -16,7 +16,7 @@ CREATE TABLE IF NOT EXISTS alarm_subscription_info ( updated_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP -- Record last update timestamp -- check does not fail with NULL - CONSTRAINT chk_filter CHECK (filter IN ('new', 'change', 'clear', 'acknowledge')), + CONSTRAINT chk_filter CHECK (filter IN ('NEW', 'CHANGE', 'CLEAR', 'ACKNOWLEDGE')), CONSTRAINT unique_callback UNIQUE (callback) ); diff --git a/internal/service/alarms/internal/db/models/alarm_event_record.go b/internal/service/alarms/internal/db/models/alarm_event_record.go index 9f99fb71..8ee6f4f6 100644 --- a/internal/service/alarms/internal/db/models/alarm_event_record.go +++ b/internal/service/alarms/internal/db/models/alarm_event_record.go @@ -9,22 +9,22 @@ import ( // AlarmEventRecord represents a record in the alarm_event_record table. type AlarmEventRecord struct { - AlarmEventRecordID uuid.UUID `db:"alarm_event_record_id"` - AlarmDefinitionID *uuid.UUID `db:"alarm_definition_id"` // nullable since ACM may not provide the cluster ID. please manually track them and let ACM know about this. - ProbableCauseID *uuid.UUID `db:"probable_cause_id"` // nullable since ACM may not provide the cluster ID. please manually track them and let ACM know about this. - AlarmRaisedTime time.Time `db:"alarm_raised_time"` - AlarmChangedTime *time.Time `db:"alarm_changed_time"` - AlarmClearedTime *time.Time `db:"alarm_cleared_time"` - AlarmAcknowledgedTime *time.Time `db:"alarm_acknowledged_time"` - AlarmAcknowledged bool `db:"alarm_acknowledged"` - PerceivedSeverity generated.PerceivedSeverity `db:"perceived_severity"` - Extensions map[string]string `db:"extensions"` - ObjectID *uuid.UUID `db:"object_id"` // nullable since ACM may not provide the cluster ID. please manually track them and let ACM know about this. - ObjectTypeID *uuid.UUID `db:"object_type_id"` // nullable since ACM may not provide the cluster ID. please manually track them and let ACM know about this. - AlarmStatus string `db:"alarm_status"` - Fingerprint string `db:"fingerprint"` - AlarmSequenceNumber int64 `db:"alarm_sequence_number"` - CreatedAt time.Time `db:"created_at"` + AlarmEventRecordID uuid.UUID `db:"alarm_event_record_id"` + AlarmDefinitionID *uuid.UUID `db:"alarm_definition_id"` // nullable since ACM may not provide the cluster ID. please manually track them and let ACM know about this. + ProbableCauseID *uuid.UUID `db:"probable_cause_id"` // nullable since ACM may not provide the cluster ID. please manually track them and let ACM know about this. + AlarmRaisedTime time.Time `db:"alarm_raised_time"` + AlarmChangedTime *time.Time `db:"alarm_changed_time"` + AlarmClearedTime *time.Time `db:"alarm_cleared_time"` + AlarmAcknowledgedTime *time.Time `db:"alarm_acknowledged_time"` + AlarmAcknowledged bool `db:"alarm_acknowledged"` + PerceivedSeverity generated.PerceivedSeverity `db:"perceived_severity"` + Extensions map[string]string `db:"extensions"` + ObjectID *uuid.UUID `db:"object_id"` // nullable since ACM may not provide the cluster ID. please manually track them and let ACM know about this. + ObjectTypeID *uuid.UUID `db:"object_type_id"` // nullable since ACM may not provide the cluster ID. please manually track them and let ACM know about this. + NotificationEventType generated.AlarmSubscriptionInfoFilter `db:"notification_event_type"` + AlarmStatus string `db:"alarm_status"` + Fingerprint string `db:"fingerprint"` + AlarmSequenceNumber int64 `db:"alarm_sequence_number"` } // TableName returns the name of the table in the database diff --git a/internal/service/alarms/internal/db/models/alarm_subscription.go b/internal/service/alarms/internal/db/models/alarm_subscription.go index 80e3185d..a9e0d0f2 100644 --- a/internal/service/alarms/internal/db/models/alarm_subscription.go +++ b/internal/service/alarms/internal/db/models/alarm_subscription.go @@ -3,15 +3,17 @@ package models import ( "time" + "github.com/openshift-kni/oran-o2ims/internal/service/alarms/api/generated" + "github.com/google/uuid" ) // AlarmSubscription represents the alarm_subscription_info table in the database type AlarmSubscription struct { - SubscriptionID uuid.UUID `db:"subscription_id"` - ConsumerSubscriptionID *uuid.UUID `db:"consumer_subscription_id"` - Filter *string `db:"filter"` - Callback string `db:"callback"` + SubscriptionID uuid.UUID `db:"subscription_id"` + ConsumerSubscriptionID *uuid.UUID `db:"consumer_subscription_id"` + Filter *generated.AlarmSubscriptionInfoFilter `db:"filter"` + Callback string `db:"callback"` EventCursor int64 `db:"event_cursor"` CreatedAt time.Time `db:"created_at"` diff --git a/internal/service/alarms/internal/db/models/converters.go b/internal/service/alarms/internal/db/models/converters.go index 590901aa..4a1f3c86 100644 --- a/internal/service/alarms/internal/db/models/converters.go +++ b/internal/service/alarms/internal/db/models/converters.go @@ -1,24 +1,83 @@ package models import ( + "fmt" + + "github.com/google/uuid" api "github.com/openshift-kni/oran-o2ims/internal/service/alarms/api/generated" ) // ConvertAlarmEventRecordModelToApi converts an AlarmEventRecord DB model to an API model func ConvertAlarmEventRecordModelToApi(aerModel AlarmEventRecord) api.AlarmEventRecord { - return api.AlarmEventRecord{ + record := api.AlarmEventRecord{ AlarmAcknowledged: aerModel.AlarmAcknowledged, AlarmAcknowledgedTime: aerModel.AlarmAcknowledgedTime, AlarmChangedTime: aerModel.AlarmChangedTime, AlarmClearedTime: aerModel.AlarmClearedTime, - AlarmDefinitionId: *aerModel.AlarmDefinitionID, AlarmEventRecordId: aerModel.AlarmEventRecordID, AlarmRaisedTime: aerModel.AlarmRaisedTime, PerceivedSeverity: aerModel.PerceivedSeverity, - ProbableCauseId: *aerModel.ProbableCauseID, - ResourceTypeID: *aerModel.ObjectTypeID, Extensions: aerModel.Extensions, } + + if aerModel.AlarmDefinitionID != nil { + record.AlarmDefinitionId = *aerModel.AlarmDefinitionID + } + if aerModel.ProbableCauseID != nil { + record.ProbableCauseId = *aerModel.ProbableCauseID + } + if aerModel.ObjectTypeID != nil { + record.ResourceTypeID = *aerModel.ObjectTypeID + } + + return record +} + +// convertAlarmEventRecordModelToAlarmEventNotification converts an AlarmEventRecord to api AlarmEventNotification +func convertAlarmEventRecordModelToAlarmEventNotification(aerModel AlarmEventRecord, subModel AlarmSubscription, globalCloudID uuid.UUID) api.AlarmEventNotification { + or := fmt.Sprintf("/o2ims-infrastructureMonitoring/v1/alarms/%v", aerModel.AlarmEventRecordID.String()) + + notification := api.AlarmEventNotification{ + AlarmAcknowledgeTime: aerModel.AlarmAcknowledgedTime, + AlarmAcknowledged: aerModel.AlarmAcknowledged, + AlarmEventRecordId: aerModel.AlarmEventRecordID, + AlarmRaisedTime: aerModel.AlarmRaisedTime, + ConsumerSubscriptionId: subModel.ConsumerSubscriptionID, + Extensions: aerModel.Extensions, + GlobalCloudID: globalCloudID, + NotificationEventType: AlarmFilterToEventType(aerModel.NotificationEventType), + ObjectRef: &or, + PerceivedSeverity: aerModel.PerceivedSeverity, + } + + // Handle all pointer fields together + if aerModel.AlarmChangedTime != nil { + notification.AlarmChangedTime = *aerModel.AlarmChangedTime + } + if aerModel.AlarmDefinitionID != nil { + notification.AlarmDefinitionID = *aerModel.AlarmDefinitionID + } + if aerModel.ProbableCauseID != nil { + notification.ProbableCauseID = *aerModel.ProbableCauseID + } + if aerModel.ObjectID != nil { + notification.ResourceID = *aerModel.ObjectID + } + if aerModel.ObjectTypeID != nil { + notification.ResourceTypeID = *aerModel.ObjectTypeID + } + + return notification +} + +// ConvertAlarmEventRecordModelsToAlarmEventNotifications converts multiple AlarmEventRecord DB to multiple api AlarmEventNotification +func ConvertAlarmEventRecordModelsToAlarmEventNotifications(aerModel []AlarmEventRecord, subModel AlarmSubscription, globalCloudID uuid.UUID) []api.AlarmEventNotification { + var res []api.AlarmEventNotification + for _, aer := range aerModel { + res = append(res, convertAlarmEventRecordModelToAlarmEventNotification(aer, subModel, globalCloudID)) + } + + return res } // ConvertServiceConfigurationToAPI converts an ServiceConfiguration DB model to an API model @@ -43,7 +102,7 @@ func ConvertSubscriptionModelToApi(subscriptionModel AlarmSubscription) api.Alar } if subscriptionModel.Filter != nil { - *apiModel.Filter = api.AlarmSubscriptionInfoFilter(*subscriptionModel.Filter) + apiModel.Filter = subscriptionModel.Filter } return apiModel @@ -54,6 +113,22 @@ func ConvertSubscriptionAPIToModel(subscriptionAPI *api.AlarmSubscriptionInfo) A return AlarmSubscription{ Callback: subscriptionAPI.Callback, ConsumerSubscriptionID: subscriptionAPI.ConsumerSubscriptionId, - Filter: (*string)(subscriptionAPI.Filter), + Filter: subscriptionAPI.Filter, + } +} + +// AlarmFilterToEventType map text to int e.g NEW -> 0 +func AlarmFilterToEventType(filter api.AlarmSubscriptionInfoFilter) api.AlarmEventNotificationNotificationEventType { + switch filter { + case api.AlarmSubscriptionInfoFilterNEW: + return api.AlarmEventNotificationNotificationEventTypeNEW + case api.AlarmSubscriptionInfoFilterCHANGE: + return api.AlarmEventNotificationNotificationEventTypeCHANGE + case api.AlarmSubscriptionInfoFilterCLEAR: + return api.AlarmEventNotificationNotificationEventTypeCLEAR + case api.AlarmSubscriptionInfoFilterACKNOWLEDGE: + return api.AlarmEventNotificationNotificationEventTypeACKNOWLEDGE + default: + return api.AlarmEventNotificationNotificationEventTypeNEW } } diff --git a/internal/service/alarms/internal/db/repo/alarms_repository.go b/internal/service/alarms/internal/db/repo/alarms_repository.go index 761eb88d..bd126b84 100644 --- a/internal/service/alarms/internal/db/repo/alarms_repository.go +++ b/internal/service/alarms/internal/db/repo/alarms_repository.go @@ -205,12 +205,12 @@ func (ar *AlarmsRepository) UpsertAlarmEventRecord(ctx context.Context, records return fmt.Errorf("failed to build query for event upsert: %w", err) } - _, err = ar.Db.Exec(ctx, sql, params...) + r, err := ar.Db.Exec(ctx, sql, params...) if err != nil { return fmt.Errorf("failed to execute upsert query: %w", err) } - slog.Info("Successfully inserted alerts from alertmanager", "table", models.AlarmEventRecord{}.TableName(), "count", len(records)) + slog.Info("Successfully inserted and updated alerts from alertmanager", "count", r.RowsAffected()) return nil } @@ -240,8 +240,8 @@ func buildAlarmEventRecordUpsertQuery(records []models.AlarmEventRecord) (string query.Apply(values...) // Set upsert constraints - // Cols here should match track_alarm_update trigger function. - // This will ensure alarm_changed_time is always updated as long as it has not been previously acked. + // Cols here should match manage_alarm_event trigger function. + // This will ensure alarm_changed_time, alarm_changed_time, alarm_sequence_number are always updated as long as it has not been previously acked. dbTags := utils.GetAllDBTagsFromStruct(m) query.Apply(im.OnConflictOnConstraint(m.OnConflict()).DoUpdate( im.SetExcluded(dbTags["AlarmStatus"]), @@ -357,3 +357,78 @@ func getGetAlertFingerPrintAndStartAt(am *api.AlertmanagerNotification) []bob.Ex return b } + +// GetAlarmsForSubscription for a given subscription get all alarms based on the sequence number and filter +func (ar *AlarmsRepository) GetAlarmsForSubscription(ctx context.Context, subscription models.AlarmSubscription) ([]models.AlarmEventRecord, error) { + m := models.AlarmEventRecord{} + dbTags := utils.GetAllDBTagsFromStruct(m) + queryMods := []bob.Mod[*dialect.SelectQuery]{ + sm.Columns(utils.GetColumnsAsAny(utils.GetColumns(m, []string{ + "AlarmEventRecordID", "AlarmDefinitionID", "ProbableCauseID", + "AlarmRaisedTime", "AlarmChangedTime", "AlarmClearedTime", + "AlarmAcknowledgedTime", "AlarmAcknowledged", "PerceivedSeverity", + "Extensions", "ObjectID", "ObjectTypeID", + "NotificationEventType", "AlarmSequenceNumber", + }))...), + sm.From(m.TableName()), + } + + // Start with the base condition + // Collect all the events that the subscriber has not yet been notified before + whereClause := psql.Quote(dbTags["AlarmSequenceNumber"]).GT(psql.Arg(subscription.EventCursor)) + // If we have a filter, add it to the WHERE clause + if subscription.Filter != nil { + whereClause = psql.And( + whereClause, + psql.Quote(dbTags["NotificationEventType"]).NE(psql.Arg(subscription.Filter)), + ) + } + // Add WHERE and ORDER BY clauses + queryMods = append(queryMods, + sm.Where(whereClause), + sm.OrderBy(dbTags["AlarmSequenceNumber"]).Asc(), + ) + + // Build final query + query := psql.Select(queryMods...) + + sql, params, err := query.Build() + if err != nil { + return []models.AlarmEventRecord{}, fmt.Errorf("failed to build GetAlarmsForSubscription query: %w", err) + } + + records, err := utils.ExecuteCollectRows[models.AlarmEventRecord](ctx, ar.Db, sql, params) + if err != nil { + return []models.AlarmEventRecord{}, fmt.Errorf("failed to execute GetAlarmsForSubscription query: %w", err) + } + + if len(records) > 0 { + slog.Info("Successfully got alarms for subscription", "alarm count", len(records), "Subscription", subscription.SubscriptionID) + } + return records, nil +} + +// UpdateSubscriptionEventCursor update a given subscription event cursor with a alarm sequence value +func (ar *AlarmsRepository) UpdateSubscriptionEventCursor(ctx context.Context, subscription models.AlarmSubscription, latestSequence int64) error { + m := models.AlarmSubscription{} + dbTags := utils.GetAllDBTagsFromStruct(m) + query := psql.Update( + um.Table(m.TableName()), + um.SetCol(dbTags["EventCursor"]).ToArg(latestSequence), + um.Where(psql.Quote(dbTags["SubscriptionID"]).EQ(psql.Arg(subscription.SubscriptionID))), + um.Returning(m.PrimaryKey()), + ) + + sql, params, err := query.Build() + if err != nil { + return fmt.Errorf("failed to build UpdateSubscriptionEventCursor query: %w", err) + } + + record, err := utils.ExecuteCollectExactlyOneRow[models.AlarmSubscription](ctx, ar.Db, sql, params) + if err != nil { + return fmt.Errorf("failed to execute UpdateSubscriptionEventCursor query: %w", err) + } + + slog.Info("Successfully updated subscription with new event cursor", "subscription", subscription.SubscriptionID, "from", subscription.EventCursor, "to", record.EventCursor) + return nil +} diff --git a/internal/service/alarms/serve.go b/internal/service/alarms/serve.go index 7b8571a0..f8bccf5b 100644 --- a/internal/service/alarms/serve.go +++ b/internal/service/alarms/serve.go @@ -80,9 +80,7 @@ func Serve(config *api.AlarmsServerConfig) error { Db: pool, } - // TODO: Audit and Insert data database - - // TODO: Launch k8s job for DB remove archived data + // TODO: Launch k8s job for DB remove resolved data // Parse global cloud id var globalCloudID uuid.UUID @@ -91,6 +89,9 @@ func Serve(config *api.AlarmsServerConfig) error { if err != nil { return fmt.Errorf("failed to parse global cloud id: %w", err) } + } else { + slog.Debug("No globalCloudID set by the operator, generating one instead") + globalCloudID = uuid.New() } // Add Alarm Service Configuration to the database @@ -102,9 +103,23 @@ func Serve(config *api.AlarmsServerConfig) error { // Init server // Create the handler + // TODO: fill in the oauth attributes from the SMO config passed to the server + oc, err := utils.SetupOAuthClient(ctx, utils.OAuthClientConfig{}) + if err != nil { + return fmt.Errorf("failed to setup oauth client: %w", err) + } + + // Create a new logger to be passed to things that need a logger + logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ + AddSource: true, + Level: slog.LevelDebug, + })) + alarmServer := api.AlarmsServer{ GlobalCloudID: globalCloudID, AlarmsRepository: alarmRepository, + HttpClient: oc, + Logger: logger, } if err := UpdateAlarmDictionaryAndAlarmsDefinitionData(ctx, hubClient, &alarmServer); err != nil { @@ -124,12 +139,6 @@ func Serve(config *api.AlarmsServerConfig) error { // Register a default handler that replies with 404 so that we can override the response format r.HandleFunc("/", common.NotFoundFunc()) - // Create a new logger to be passed to things that need a logger - logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ - AddSource: true, - Level: slog.LevelDebug, // TODO: set with server args - })) - // This also validates the spec file swagger, err := generated.GetSwagger() if err != nil { @@ -137,7 +146,7 @@ func Serve(config *api.AlarmsServerConfig) error { } // Create a response filter filterAdapter that can support the 'filter' and '*fields' query parameters - filterAdapter, err := common.NewFilterAdapter(logger, swagger) + filterAdapter, err := common.NewFilterAdapter(alarmServer.Logger, swagger) if err != nil { return fmt.Errorf("error creating filter filterAdapter: %w", err) } diff --git a/internal/service/common/notifier/event.go b/internal/service/common/notifier/event.go index 55af9f47..6626edaa 100644 --- a/internal/service/common/notifier/event.go +++ b/internal/service/common/notifier/event.go @@ -4,9 +4,12 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "log/slog" + "net" "net/http" + "syscall" "time" "github.com/google/uuid" @@ -30,6 +33,7 @@ func sendNotification(ctx context.Context, client *http.Client, url string, even if err != nil { return fmt.Errorf("failed to send notification: %w", err) } + defer response.Body.Close() if response.StatusCode != http.StatusOK { return fmt.Errorf("notification failed: %v", response.StatusCode) @@ -46,26 +50,45 @@ func processEvent(ctx context.Context, logger *slog.Logger, client *http.Client, logger.Info("processing data change event", "notificationID", event.NotificationID, "sequenceID", event.SequenceID) + _ = CallUrl(ctx, logger, client, url, event) + + completionChannel <- &SubscriptionJobComplete{ + subscriptionID: subscriptionID, + notificationID: event.NotificationID, + sequenceID: event.SequenceID, + } +} + +func CallUrl(ctx context.Context, logger *slog.Logger, client *http.Client, url string, event Notification) error { var err error = nil delay := retryDelay - for i := 0; i < maxRetries; i++ { + for attempt := 0; attempt < maxRetries; attempt++ { err = sendNotification(ctx, client, url, event) if err == nil { break } + if nonRetryable, nonRetryErr := isNonRetryableError(err); nonRetryable { + msg := "failed to send notification due a non retryable error" + logger.Error(msg, "error", nonRetryErr, + "notificationID", event.NotificationID, "sequenceID", event.SequenceID) + return fmt.Errorf("%s: %w", msg, err) + } + logger.Warn("failed to send notification", "error", err, - "notificationID", event.NotificationID, "sequenceID", event.SequenceID, "delay", delay) + "notificationID", event.NotificationID, "sequenceID", event.SequenceID, "delay", delay.String()) - select { - case <-ctx.Done(): - logger.Warn("context canceled while sending notification", - "notificationID", event.NotificationID, "sequenceID", event.SequenceID) - break - case <-time.After(delay): - delay *= 2 - logger.Debug("retrying notification", - "notificationID", event.NotificationID, "sequenceID", event.SequenceID) + if attempt < maxRetries-1 { + select { + case <-ctx.Done(): + logger.Warn("context canceled while sending notification", + "notificationID", event.NotificationID, "sequenceID", event.SequenceID) + break + case <-time.After(delay): + delay *= 2 + logger.Debug("retrying notification", + "notificationID", event.NotificationID, "sequenceID", event.SequenceID) + } } } @@ -74,14 +97,23 @@ func processEvent(ctx context.Context, logger *slog.Logger, client *http.Client, "notificationID", event.NotificationID, "sequenceID", event.SequenceID) // TODO: If we were able to send this one then we are not likely to be able to send any // of the others so perhaps we should purge our queue, or enter a longer backoff period. + return fmt.Errorf("error sending notification; retries exceeded: %w", err) } else { logger.Info("notification sent", "notificationID", event.NotificationID, "sequenceID", event.SequenceID) } - completionChannel <- &SubscriptionJobComplete{ - subscriptionID: subscriptionID, - notificationID: event.NotificationID, - sequenceID: event.SequenceID, + return nil +} + +// isNonRetryableError checks if the error should not be retried and returns appropriate error message +func isNonRetryableError(err error) (bool, error) { + var dnsErr *net.DNSError + if errors.As(err, &dnsErr) && dnsErr.IsNotFound { + return true, fmt.Errorf("host not found: %w", err) + } + if errors.Is(err, syscall.ECONNREFUSED) { + return true, fmt.Errorf("connection refused: %w", err) } + return false, nil }