@@ -31,6 +31,7 @@ StraxFormatter::StraxFormatter(std::shared_ptr<Options>& opts, std::shared_ptr<M
31
31
fChunkLength = long (fOptions ->GetDouble (" strax_chunk_length" , 5 )*1e9 ); // default 5s
32
32
fChunkOverlap = long (fOptions ->GetDouble (" strax_chunk_overlap" , 0.5 )*1e9 ); // default 0.5s
33
33
fFragmentBytes = fOptions ->GetInt (" strax_fragment_payload_bytes" , 110 *2 );
34
+ FullFragmentSize = fFragmentBytes + fStraxHeaderSize ;
34
35
fCompressor = fOptions ->GetString (" compressor" , " lz4" );
35
36
fFullChunkLength = fChunkLength +fChunkOverlap ;
36
37
fHostname = fOptions ->Hostname ();
@@ -39,6 +40,7 @@ StraxFormatter::StraxFormatter(std::shared_ptr<Options>& opts, std::shared_ptr<M
39
40
if (run_num == -1 ) run_name = " run" ;
40
41
else {
41
42
run_name = std::to_string (run_num);
43
+ // chunk names are 6 digits long
42
44
if (run_name.size () < 6 ) run_name.insert (0 , 6 - run_name.size (), int (' 0' ));
43
45
}
44
46
@@ -97,24 +99,21 @@ void StraxFormatter::GetDataPerChan(std::map<int, int>& ret) {
97
99
98
100
void StraxFormatter::GenerateArtificialDeadtime (int64_t timestamp, const std::shared_ptr<V1724>& digi) {
99
101
std::string fragment;
100
- fragment.reserve (fFragmentBytes + fStraxHeaderSize );
101
- timestamp *= digi->GetClockWidth ();
102
- fragment.append ((char *)×tamp, sizeof (timestamp));
102
+ fragment.reserve (fFullFragmentSize );
103
+ timestamp *= digi->GetClockWidth (); // TODO nv
103
104
int32_t length = fFragmentBytes >>1 ;
105
+ int16_t sw = digi->SampleWidth (), channel = digi->GetADChannel (), zero = 0 ;
106
+ fragment.append ((char *)×tamp, sizeof (timestamp));
104
107
fragment.append ((char *)&length, sizeof (length));
105
- int16_t sw = digi->SampleWidth ();
106
108
fragment.append ((char *)&sw, sizeof (sw));
107
- int16_t channel = 790 ; // TODO add MV and NV support
108
109
fragment.append ((char *)&channel, sizeof (channel));
109
110
fragment.append ((char *)&length, sizeof (length));
110
- int16_t fragment_i = 0 ;
111
- fragment.append ((char *)&fragment_i, sizeof (fragment_i));
112
- int16_t baseline = 0 ;
113
- fragment.append ((char *)&baseline, sizeof (baseline));
114
- int16_t zero = 0 ;
111
+ fragment.append ((char *)&zero, sizeof (zero)); // fragment_i
112
+ fragment.append ((char *)&zero, sizeof (zero)); // baseline
115
113
for (; length > 0 ; length--)
116
- fragment.append ((char *)&zero, sizeof (zero));
114
+ fragment.append ((char *)&zero, sizeof (zero)); // wf
117
115
AddFragmentToBuffer (std::move (fragment), 0 , 0 );
116
+ return ;
118
117
}
119
118
120
119
void StraxFormatter::ProcessDatapacket (std::unique_ptr<data_packet> dp){
@@ -176,8 +175,9 @@ int StraxFormatter::ProcessEvent(std::u32string_view buff,
176
175
buff.remove_prefix (event_header_words);
177
176
int ret;
178
177
int frags (0 );
178
+ unsigned n_chan = dp->digi ->GetNumChannels ();
179
179
180
- for (unsigned ch=0 ; ch<max_channels ; ch++){
180
+ for (unsigned ch=0 ; ch<n_chan ; ch++){
181
181
if (channel_mask & (1 <<ch)) {
182
182
clock_gettime (CLOCK_THREAD_CPUTIME_ID, &ch_start);
183
183
ret = ProcessChannel (buff, words, channel_mask, event_time, frags, ch, dp, dpc);
@@ -210,16 +210,19 @@ int StraxFormatter::ProcessChannel(std::u32string_view buff, int words_in_event,
210
210
211
211
int num_frags = std::ceil (1 .*samples_in_pulse/samples_per_frag);
212
212
frags += num_frags;
213
+ int32_t samples_this_frag = 0 ;
214
+ int64_t time_this_frag = 0 ;
215
+ const uint16_t filler = 0 ;
213
216
for (uint16_t frag_i = 0 ; frag_i < num_frags; frag_i++) {
214
217
std::string fragment;
215
- fragment.reserve (fFragmentBytes + fStraxHeaderSize );
218
+ fragment.reserve (fFullFragmentSize );
216
219
217
220
// How long is this fragment?
218
- int32_t samples_this_frag = samples_per_frag;
221
+ samples_this_frag = samples_per_frag;
219
222
if (frag_i == num_frags-1 )
220
223
samples_this_frag = samples_in_pulse - frag_i*samples_per_frag;
221
224
222
- int64_t time_this_frag = timestamp + samples_per_frag*sw*frag_i;
225
+ time_this_frag = timestamp + samples_per_frag*sw*frag_i;
223
226
fragment.append ((char *)&time_this_frag, sizeof (time_this_frag));
224
227
fragment.append ((char *)&samples_this_frag, sizeof (samples_this_frag));
225
228
fragment.append ((char *)&sw, sizeof (sw));
@@ -231,7 +234,6 @@ int StraxFormatter::ProcessChannel(std::u32string_view buff, int words_in_event,
231
234
// Copy the raw buffer
232
235
fragment.append ((char *)wf.data (), samples_this_frag*sizeof (uint16_t ));
233
236
wf.remove_prefix (samples_this_frag*sizeof (uint16_t )/sizeof (char32_t ));
234
- uint16_t zero_filler = 0 ;
235
237
for (; samples_this_frag < samples_per_frag; samples_this_frag++)
236
238
fragment.append ((char *)&zero_filler, sizeof (zero_filler));
237
239
@@ -264,7 +266,7 @@ void StraxFormatter::AddFragmentToBuffer(std::string fragment, uint32_t ts, int
264
266
fThreadId , chunk_id - max_chunk - 1 , channel);
265
267
}
266
268
267
- fOutputBufferSize += fFragmentBytes + fStraxHeaderSize ;
269
+ fOutputBufferSize += fFullFragmentSize ;
268
270
269
271
if (!overlap){
270
272
fChunks [chunk_id].emplace_back (std::move (fragment));
@@ -273,11 +275,12 @@ void StraxFormatter::AddFragmentToBuffer(std::string fragment, uint32_t ts, int
273
275
}
274
276
}
275
277
276
- void StraxFormatter::ReceiveDatapackets (std::list<std::unique_ptr<data_packet>>& in) {
278
+ void StraxFormatter::ReceiveDatapackets (std::list<std::unique_ptr<data_packet>>& in, int bytes ) {
277
279
{
278
280
const std::lock_guard<std::mutex> lk (fBufferMutex );
279
281
fBufferCounter [in.size ()]++;
280
282
fBuffer .splice (fBuffer .end (), in);
283
+ fInputBufferSize += bytes;
281
284
}
282
285
fCV .notify_one ();
283
286
}
@@ -332,7 +335,7 @@ void StraxFormatter::WriteOutChunk(int chunk_i){
332
335
333
336
for (int i = 0 ; i < 2 ; i++) {
334
337
if (buffers[i]->size () == 0 ) continue ;
335
- uncompressed_size[i] = buffers[i]->size ()*( fFragmentBytes + fStraxHeaderSize ) ;
338
+ uncompressed_size[i] = buffers[i]->size ()*fFullFragmentSize ;
336
339
uncompressed.reserve (uncompressed_size[i]);
337
340
for (auto it = buffers[i]->begin (); it != buffers[i]->end (); it++)
338
341
uncompressed += *it; // std::accumulate would be nice but 3x slower without -O2
0 commit comments