99#include "event.h"
1010#include "errors.h"
1111
12- typedef struct {
13- int64 id ;
14- StringInfo body ;
15- struct curl_slist * request_headers ;
16- int32 timeout_milliseconds ;
17- } CurlData ;
18-
1912static SPIPlanPtr del_response_plan = NULL ;
2013static SPIPlanPtr del_return_queue_plan = NULL ;
2114static SPIPlanPtr ins_response_plan = NULL ;
@@ -52,18 +45,15 @@ static struct curl_slist *pg_text_array_to_slist(ArrayType *array,
5245 return headers ;
5346}
5447
55- // We need a different memory context here, as the parent function will have an SPI memory context, which has a shorter lifetime.
56- static void init_curl_handle (CURLM * curl_mhandle , MemoryContext curl_memctx , int64 id , Datum urlBin , NullableDatum bodyBin , NullableDatum headersBin , Datum methodBin , int32 timeout_milliseconds ){
57- MemoryContext old_ctx = MemoryContextSwitchTo (curl_memctx );
58-
59- CurlData * cdata = palloc (sizeof (CurlData ));
60- cdata -> id = id ;
48+ void init_curl_handle (CurlData * cdata , RequestQueueRow row ){
49+ cdata -> id = row .id ;
6150 cdata -> body = makeStringInfo ();
51+ cdata -> ez_handle = curl_easy_init ();
6252
63- cdata -> timeout_milliseconds = timeout_milliseconds ;
53+ cdata -> timeout_milliseconds = row . timeout_milliseconds ;
6454
65- if (!headersBin .isnull ) {
66- ArrayType * pgHeaders = DatumGetArrayTypeP (headersBin .value );
55+ if (!row . headersBin .isnull ) {
56+ ArrayType * pgHeaders = DatumGetArrayTypeP (row . headersBin .value );
6757 struct curl_slist * request_headers = NULL ;
6858
6959 request_headers = pg_text_array_to_slist (pgHeaders , request_headers );
@@ -73,64 +63,55 @@ static void init_curl_handle(CURLM *curl_mhandle, MemoryContext curl_memctx, int
7363 cdata -> request_headers = request_headers ;
7464 }
7565
76- char * url = TextDatumGetCString (urlBin );
66+ cdata -> url = TextDatumGetCString (row . url );
7767
78- char * reqBody = !bodyBin .isnull ? TextDatumGetCString (bodyBin .value ) : NULL ;
68+ cdata -> req_body = !row . bodyBin .isnull ? TextDatumGetCString (row . bodyBin .value ) : NULL ;
7969
80- char * method = TextDatumGetCString (methodBin );
81- if (strcasecmp (method , "GET" ) != 0 && strcasecmp (method , "POST" ) != 0 && strcasecmp (method , "DELETE" ) != 0 ) {
82- ereport (ERROR , errmsg ("Unsupported request method %s" , method ));
83- }
70+ cdata -> method = TextDatumGetCString (row .method );
8471
85- CURL * curl_ez_handle = curl_easy_init ();
86- if (! curl_ez_handle )
87- ereport ( ERROR , errmsg ( "curl_easy_init()" ));
72+ if ( strcasecmp ( cdata -> method , "GET" ) != 0 && strcasecmp ( cdata -> method , "POST" ) != 0 && strcasecmp ( cdata -> method , "DELETE" ) != 0 ) {
73+ ereport ( ERROR , errmsg ( "Unsupported request method %s" , cdata -> method ));
74+ }
8875
89- if (strcasecmp (method , "GET" ) == 0 ) {
90- if (reqBody ) {
91- EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_POSTFIELDS , reqBody );
92- EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_CUSTOMREQUEST , "GET" );
76+ if (strcasecmp (cdata -> method , "GET" ) == 0 ) {
77+ if (cdata -> req_body ) {
78+ EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_POSTFIELDS , cdata -> req_body );
79+ EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_CUSTOMREQUEST , "GET" );
9380 }
9481 }
9582
96- if (strcasecmp (method , "POST" ) == 0 ) {
97- if (reqBody ) {
98- EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_POSTFIELDS , reqBody );
83+ if (strcasecmp (cdata -> method , "POST" ) == 0 ) {
84+ if (cdata -> req_body ) {
85+ EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_POSTFIELDS , cdata -> req_body );
9986 }
10087 else {
101- EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_POST , 1L );
102- EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_POSTFIELDSIZE , 0L );
88+ EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_POST , 1L );
89+ EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_POSTFIELDSIZE , 0L );
10390 }
10491 }
10592
106- if (strcasecmp (method , "DELETE" ) == 0 ) {
107- EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_CUSTOMREQUEST , "DELETE" );
108- if (reqBody ) {
109- EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_POSTFIELDS , reqBody );
93+ if (strcasecmp (cdata -> method , "DELETE" ) == 0 ) {
94+ EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_CUSTOMREQUEST , "DELETE" );
95+ if (cdata -> req_body ) {
96+ EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_POSTFIELDS , cdata -> req_body );
11097 }
11198 }
11299
113- EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_WRITEFUNCTION , body_cb );
114- EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_WRITEDATA , cdata );
115- EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_HEADER , 0L );
116- EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_URL , url );
117- EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_HTTPHEADER , cdata -> request_headers );
118- EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_TIMEOUT_MS , (long ) cdata -> timeout_milliseconds );
119- EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_PRIVATE , cdata );
120- EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_FOLLOWLOCATION , (long ) true);
100+ EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_WRITEFUNCTION , body_cb );
101+ EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_WRITEDATA , cdata );
102+ EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_HEADER , 0L );
103+ EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_URL , cdata -> url );
104+ EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_HTTPHEADER , cdata -> request_headers );
105+ EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_TIMEOUT_MS , (long ) cdata -> timeout_milliseconds );
106+ EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_PRIVATE , cdata );
107+ EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_FOLLOWLOCATION , (long ) true);
121108 if (log_min_messages <= DEBUG2 )
122- EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_VERBOSE , 1L );
109+ EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_VERBOSE , 1L );
123110#if LIBCURL_VERSION_NUM >= 0x075500 /* libcurl 7.85.0 */
124- EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_PROTOCOLS_STR , "http,https" );
111+ EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_PROTOCOLS_STR , "http,https" );
125112#else
126- EREPORT_CURL_SETOPT (curl_ez_handle , CURLOPT_PROTOCOLS , CURLPROTO_HTTP | CURLPROTO_HTTPS );
113+ EREPORT_CURL_SETOPT (cdata -> ez_handle , CURLOPT_PROTOCOLS , CURLPROTO_HTTP | CURLPROTO_HTTPS );
127114#endif
128-
129- EREPORT_MULTI (
130- curl_multi_add_handle (curl_mhandle , curl_ez_handle )
131- );
132-
133- MemoryContextSwitchTo (old_ctx );
134115}
135116
136117void set_curl_mhandle (WorkerState * wstate ){
@@ -141,8 +122,6 @@ void set_curl_mhandle(WorkerState *wstate){
141122}
142123
143124uint64 delete_expired_responses (char * ttl , int batch_size ){
144- SPI_connect ();
145-
146125 if (del_response_plan == NULL ) {
147126 SPIPlanPtr tmp = SPI_prepare ("\
148127 WITH\
@@ -178,14 +157,10 @@ uint64 delete_expired_responses(char *ttl, int batch_size){
178157 ereport (ERROR , errmsg ("Error expiring response table rows: %s" , SPI_result_code_string (ret_code )));
179158 }
180159
181- SPI_finish ();
182-
183160 return affected_rows ;
184161}
185162
186- uint64 consume_request_queue (CURLM * curl_mhandle , int batch_size , MemoryContext curl_memctx ){
187- SPI_connect ();
188-
163+ uint64 consume_request_queue (const int batch_size ){
189164 if (del_return_queue_plan == NULL ) {
190165 SPIPlanPtr tmp = SPI_prepare ("\
191166 WITH\
@@ -214,47 +189,40 @@ uint64 consume_request_queue(CURLM *curl_mhandle, int batch_size, MemoryContext
214189 if (ret_code != SPI_OK_DELETE_RETURNING )
215190 ereport (ERROR , errmsg ("Error getting http request queue: %s" , SPI_result_code_string (ret_code )));
216191
217- uint64 affected_rows = SPI_processed ;
218-
219- for (size_t j = 0 ; j < affected_rows ; j ++ ) {
220- bool tupIsNull = false;
221-
222- int64 id = DatumGetInt64 (SPI_getbinval (SPI_tuptable -> vals [j ], SPI_tuptable -> tupdesc , 1 , & tupIsNull ));
223- EREPORT_NULL_ATTR (tupIsNull , id );
192+ return SPI_processed ;
193+ }
224194
225- int32 timeout_milliseconds = DatumGetInt32 (SPI_getbinval (SPI_tuptable -> vals [j ], SPI_tuptable -> tupdesc , 4 , & tupIsNull ));
226- EREPORT_NULL_ATTR (tupIsNull , timeout_milliseconds );
195+ // This has an implicit dependency on the execution of delete_return_request_queue,
196+ // unfortunately we're not able to make this dependency explicit
197+ // due to the design of SPI (which uses global variables)
198+ RequestQueueRow get_request_queue_row (HeapTuple spi_tupval , TupleDesc spi_tupdesc ){
199+ bool tupIsNull = false;
227200
228- Datum method = SPI_getbinval (SPI_tuptable -> vals [ j ], SPI_tuptable -> tupdesc , 2 , & tupIsNull );
229- EREPORT_NULL_ATTR (tupIsNull , method );
201+ int64 id = DatumGetInt64 ( SPI_getbinval (spi_tupval , spi_tupdesc , 1 , & tupIsNull ) );
202+ EREPORT_NULL_ATTR (tupIsNull , id );
230203
231- Datum url = SPI_getbinval (SPI_tuptable -> vals [ j ], SPI_tuptable -> tupdesc , 3 , & tupIsNull );
232- EREPORT_NULL_ATTR (tupIsNull , url );
204+ Datum method = SPI_getbinval (spi_tupval , spi_tupdesc , 2 , & tupIsNull );
205+ EREPORT_NULL_ATTR (tupIsNull , method );
233206
234- NullableDatum headersBin = {
235- .value = SPI_getbinval (SPI_tuptable -> vals [j ], SPI_tuptable -> tupdesc , 5 , & tupIsNull ),
236- .isnull = tupIsNull
237- };
207+ Datum url = SPI_getbinval (spi_tupval , spi_tupdesc , 3 , & tupIsNull );
208+ EREPORT_NULL_ATTR (tupIsNull , url );
238209
239- NullableDatum bodyBin = {
240- .value = SPI_getbinval (SPI_tuptable -> vals [j ], SPI_tuptable -> tupdesc , 6 , & tupIsNull ),
241- .isnull = tupIsNull
242- };
210+ int32 timeout_milliseconds = DatumGetInt32 (SPI_getbinval (spi_tupval , spi_tupdesc , 4 , & tupIsNull ));
211+ EREPORT_NULL_ATTR (tupIsNull , timeout_milliseconds );
243212
244- init_curl_handle (curl_mhandle , curl_memctx , id , url , bodyBin , headersBin , method , timeout_milliseconds );
245- }
213+ NullableDatum headersBin = {
214+ .value = SPI_getbinval (spi_tupval , spi_tupdesc , 5 , & tupIsNull ),
215+ .isnull = tupIsNull
216+ };
246217
247- SPI_finish ();
218+ NullableDatum bodyBin = {
219+ .value = SPI_getbinval (spi_tupval , spi_tupdesc , 6 , & tupIsNull ),
220+ .isnull = tupIsNull
221+ };
248222
249- return affected_rows ;
250- }
251-
252- static void pfree_curl_data (CurlData * cdata ){
253- if (cdata -> body ){
254- destroyStringInfo (cdata -> body );
255- }
256- if (cdata -> request_headers ) //curl_slist_free_all already handles the NULL case, but be explicit about it
257- curl_slist_free_all (cdata -> request_headers );
223+ return (RequestQueueRow ){
224+ id , method , url , timeout_milliseconds , headersBin , bodyBin
225+ };
258226}
259227
260228static Jsonb * jsonb_headers_from_curl_handle (CURL * ez_handle ){
@@ -276,11 +244,14 @@ static Jsonb *jsonb_headers_from_curl_handle(CURL *ez_handle){
276244 return jsonb_headers ;
277245}
278246
279- static void insert_response (CURL * ez_handle , CurlData * cdata , CURLcode curl_return_code ){
247+ void insert_response (CURL * ez_handle , CURLcode curl_return_code ){
280248 enum { nparams = 7 }; // using an enum because const size_t nparams doesn't compile
281249 Datum vals [nparams ];
282250 char nulls [nparams ]; MemSet (nulls , 'n' , nparams );
283251
252+ CurlData * cdata = NULL ;
253+ EREPORT_CURL_GETINFO (ez_handle , CURLINFO_PRIVATE , & cdata );
254+
284255 vals [0 ] = Int64GetDatum (cdata -> id );
285256 nulls [0 ] = ' ' ;
286257
@@ -352,36 +323,15 @@ static void insert_response(CURL *ez_handle, CurlData *cdata, CURLcode curl_retu
352323 }
353324}
354325
355- // Switch back to the curl memory context, which has the curl handles stored
356- void insert_curl_responses (WorkerState * wstate , MemoryContext curl_memctx ){
357- MemoryContext old_ctx = MemoryContextSwitchTo (curl_memctx );
358- int msgs_left = 0 ;
359- CURLMsg * msg = NULL ;
360- CURLM * curl_mhandle = wstate -> curl_mhandle ;
361-
362- while ((msg = curl_multi_info_read (curl_mhandle , & msgs_left ))) {
363- if (msg -> msg == CURLMSG_DONE ) {
364- CURLcode return_code = msg -> data .result ;
365- CURL * ez_handle = msg -> easy_handle ;
366- CurlData * cdata = NULL ;
367- EREPORT_CURL_GETINFO (ez_handle , CURLINFO_PRIVATE , & cdata );
368-
369- SPI_connect ();
370- insert_response (ez_handle , cdata , return_code );
371- SPI_finish ();
372-
373- pfree_curl_data (cdata );
326+ void pfree_curl_data (CurlData * cdata ){
327+ pfree (cdata -> url );
328+ pfree (cdata -> method );
329+ if (cdata -> req_body )
330+ pfree (cdata -> req_body );
374331
375- int res = curl_multi_remove_handle (curl_mhandle , ez_handle );
376- if (res != CURLM_OK )
377- ereport (ERROR , errmsg ("curl_multi_remove_handle: %s" , curl_multi_strerror (res )));
378-
379- curl_easy_cleanup (ez_handle );
380- } else {
381- ereport (ERROR , errmsg ("curl_multi_info_read(), CURLMsg=%d\n" , msg -> msg ));
382- }
383- }
332+ if (cdata -> body )
333+ destroyStringInfo (cdata -> body );
384334
385- MemoryContextSwitchTo (old_ctx );
335+ if (cdata -> request_headers ) //curl_slist_free_all already handles the NULL case, but be explicit about it
336+ curl_slist_free_all (cdata -> request_headers );
386337}
387-
0 commit comments