Skip to content

Commit

Permalink
Add a new dummy comression called NULL
Browse files Browse the repository at this point in the history
This fixes bug #7714 where adding a column with a default
value (jargon: missing value) and a compressed batch with
all nulls created an ambiguity. In the all null cases the
compressed block was stored as a NULL value.

With this change, I introduce a new special compression
type, the 'NULL' compression which is a single byte place
holder for an 'all-null' compressed block. This allows us
to distinguish between the missing value vs the all-null
values.

Please note that the wrong results impacted existing tests
so I updated the expected results, as well as I added
reference queries before compression to prove the updated
values were wrong before.

A new debug only GUC was added for testing a future upgrade
script, which will arrive as a separate PR.

Fixes #7714
  • Loading branch information
dbeck committed Mar 6, 2025
1 parent b3ff0c5 commit 899adf1
Show file tree
Hide file tree
Showing 25 changed files with 1,906 additions and 553 deletions.
1 change: 1 addition & 0 deletions .unreleased/pr_7798
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixes: #7714 Fixes a wrong result when compressed NULL values were confused with default values. This happened in very special circumstances with alter table added a new column with a default value, an update and compression in a very particular order.
5 changes: 3 additions & 2 deletions sql/updates/latest-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ CREATE FUNCTION _timescaledb_functions.compressed_data_has_nulls(_timescaledb_in
AS '@MODULE_PATHNAME@', 'ts_update_placeholder';

INSERT INTO _timescaledb_catalog.compression_algorithm( id, version, name, description) values
( 5, 1, 'COMPRESSION_ALGORITHM_BOOL', 'bool');

( 5, 1, 'COMPRESSION_ALGORITHM_BOOL', 'bool'),
( 6, 1, 'COMPRESSION_ALGORITHM_NULL', 'null')
;

-------------------------------
-- Update compression settings
Expand Down
1 change: 1 addition & 0 deletions sql/updates/reverse-dev.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
DROP FUNCTION IF EXISTS _timescaledb_functions.compressed_data_has_nulls(_timescaledb_internal.compressed_data);

DELETE FROM _timescaledb_catalog.compression_algorithm WHERE id = 5 AND version = 1 AND name = 'COMPRESSION_ALGORITHM_BOOL';
DELETE FROM _timescaledb_catalog.compression_algorithm WHERE id = 6 AND version = 1 AND name = 'COMPRESSION_ALGORITHM_NULL';

-- Update compression settings
CREATE TABLE _timescaledb_catalog.tempsettings (LIKE _timescaledb_catalog.compression_settings);
Expand Down
16 changes: 16 additions & 0 deletions src/guc.c
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ bool ts_guc_enable_chunk_skipping = false;
TSDLLEXPORT bool ts_guc_enable_segmentwise_recompression = true;
TSDLLEXPORT bool ts_guc_enable_bool_compression = false;

/* Only settable in debug mode for testing */
TSDLLEXPORT bool ts_guc_enable_null_compression = true;

/* Enable of disable columnar scans for columnar-oriented storage engines. If
* disabled, regular sequence scans will be used instead. */
TSDLLEXPORT bool ts_guc_enable_columnarscan = true;
Expand Down Expand Up @@ -761,6 +764,19 @@ _guc_init(void)
NULL,
NULL);

#ifdef TS_DEBUG
DefineCustomBoolVariable(MAKE_EXTOPTION("enable_null_compression"),
"Debug only flag to enable NULL compression",
"Enable null compression",
&ts_guc_enable_null_compression,
true,
PGC_USERSET,
0,
NULL,
NULL,
NULL);
#endif

/*
* Define the limit on number of invalidation-based refreshes we allow per
* refresh call. If this limit is exceeded, fall back to a single refresh that
Expand Down
3 changes: 3 additions & 0 deletions src/guc.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ extern bool ts_guc_enable_chunk_skipping;
extern TSDLLEXPORT bool ts_guc_enable_segmentwise_recompression;
extern TSDLLEXPORT bool ts_guc_enable_bool_compression;

/* Only settable in debug mode for testing */
extern TSDLLEXPORT bool ts_guc_enable_null_compression;

#ifdef USE_TELEMETRY
typedef enum TelemetryLevel
{
Expand Down
3 changes: 2 additions & 1 deletion tsl/src/compression/algorithms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ set(SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/deltadelta.c
${CMAKE_CURRENT_SOURCE_DIR}/dictionary.c
${CMAKE_CURRENT_SOURCE_DIR}/gorilla.c
${CMAKE_CURRENT_SOURCE_DIR}/bool_compress.c)
${CMAKE_CURRENT_SOURCE_DIR}/bool_compress.c
${CMAKE_CURRENT_SOURCE_DIR}/null.c)
target_sources(${TSL_LIBRARY_NAME} PRIVATE ${SOURCES})
57 changes: 57 additions & 0 deletions tsl/src/compression/algorithms/null.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/

#include "null.h"
#include "fmgr.h"

typedef struct NullCompressed
{
CompressedDataHeaderFields;
} NullCompressed;

extern DecompressionIterator *
null_decompression_iterator_from_datum_forward(Datum bool_compressed, Oid element_type)

Check warning on line 16 in tsl/src/compression/algorithms/null.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/compression/algorithms/null.c#L16

Added line #L16 was not covered by tests
{
elog(ERROR, "null decompression iterator not implemented");
return NULL;
}

extern DecompressionIterator *
null_decompression_iterator_from_datum_reverse(Datum bool_compressed, Oid element_type)

Check warning on line 23 in tsl/src/compression/algorithms/null.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/compression/algorithms/null.c#L23

Added line #L23 was not covered by tests
{
elog(ERROR, "null decompression iterator not implemented");
return NULL;
}

extern void
null_compressed_send(CompressedDataHeader *header, StringInfo buffer)

Check warning on line 30 in tsl/src/compression/algorithms/null.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/compression/algorithms/null.c#L30

Added line #L30 was not covered by tests
{
elog(ERROR, "null compression doesn't implement send");
}

extern Datum
null_compressed_recv(StringInfo buffer)

Check warning on line 36 in tsl/src/compression/algorithms/null.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/compression/algorithms/null.c#L36

Added line #L36 was not covered by tests
{
elog(ERROR, "null compression doesn't implement recv");
PG_RETURN_VOID();
}

extern Compressor *
null_compressor_for_type(Oid element_type)

Check warning on line 43 in tsl/src/compression/algorithms/null.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/compression/algorithms/null.c#L43

Added line #L43 was not covered by tests
{
elog(ERROR, "null compressor not implemented");
return NULL;
}

extern void *
null_compressor_get_dummy_block(void)
{
NullCompressed *compressed = palloc(sizeof(NullCompressed));
Size compressed_size = sizeof(NullCompressed);
compressed->compression_algorithm = COMPRESSION_ALGORITHM_NULL;
SET_VARSIZE(&compressed->vl_len_, compressed_size);
return compressed;
}
48 changes: 48 additions & 0 deletions tsl/src/compression/algorithms/null.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/
#pragma once

/*
* The NULL compression algorithm is a no-op compression algorithm that is only
* used to signal that all values in a compressed block are NULLs. The compression
* interface functions are only defined to comply with the framework, but they
* are not implemented and return an ERROR. Calling these function is a software
* bug.
*/

#include <postgres.h>
#include <fmgr.h>
#include <lib/stringinfo.h>

#include "compression/compression.h"

/*
* Compressor framework functions and definitions for the null algorithm.
*/

extern DecompressionIterator *null_decompression_iterator_from_datum_forward(Datum bool_compressed,
Oid element_type);

extern DecompressionIterator *null_decompression_iterator_from_datum_reverse(Datum bool_compressed,
Oid element_type);

extern void null_compressed_send(CompressedDataHeader *header, StringInfo buffer);

extern Datum null_compressed_recv(StringInfo buffer);

extern Compressor *null_compressor_for_type(Oid element_type);

extern void *null_compressor_get_dummy_block(void);

#define NULL_COMPRESS_ALGORITHM_DEFINITION \
{ \
.iterator_init_forward = null_decompression_iterator_from_datum_forward, \
.iterator_init_reverse = null_decompression_iterator_from_datum_reverse, \
.decompress_all = NULL, .compressed_data_send = null_compressed_send, \
.compressed_data_recv = null_compressed_recv, \
.compressor_for_type = null_compressor_for_type, \
.compressed_data_storage = TOAST_STORAGE_EXTERNAL, \
}
44 changes: 40 additions & 4 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "algorithms/deltadelta.h"
#include "algorithms/dictionary.h"
#include "algorithms/gorilla.h"
#include "algorithms/null.h"
#include "batch_metadata_builder.h"
#include "chunk.h"
#include "compression.h"
Expand All @@ -49,6 +50,7 @@ static const CompressionAlgorithmDefinition definitions[_END_COMPRESSION_ALGORIT
[COMPRESSION_ALGORITHM_GORILLA] = GORILLA_ALGORITHM_DEFINITION,
[COMPRESSION_ALGORITHM_DELTADELTA] = DELTA_DELTA_ALGORITHM_DEFINITION,
[COMPRESSION_ALGORITHM_BOOL] = BOOL_COMPRESS_ALGORITHM_DEFINITION,
[COMPRESSION_ALGORITHM_NULL] = NULL_COMPRESS_ALGORITHM_DEFINITION,
};

static NameData compression_algorithm_name[] = {
Expand All @@ -58,6 +60,7 @@ static NameData compression_algorithm_name[] = {
[COMPRESSION_ALGORITHM_GORILLA] = { "GORILLA" },
[COMPRESSION_ALGORITHM_DELTADELTA] = { "DELTADELTA" },
[COMPRESSION_ALGORITHM_BOOL] = { "BOOL" },
[COMPRESSION_ALGORITHM_NULL] = { "NULL" },
};

Name
Expand Down Expand Up @@ -1025,9 +1028,14 @@ row_compressor_flush(RowCompressor *row_compressor, CommandId mycid, bool change
Assert(column->segment_info == NULL);

compressed_data = compressor->finish(compressor);

/* non-segment columns are NULL iff all the values are NULL */
if (compressed_data == NULL)
{
if (ts_guc_enable_null_compression &&
row_compressor->rows_compressed_into_current_value > 0)
compressed_data = null_compressor_get_dummy_block();
}
row_compressor->compressed_is_null[compressed_col] = compressed_data == NULL;

if (compressed_data != NULL)
row_compressor->compressed_values[compressed_col] =
PointerGetDatum(compressed_data);
Expand Down Expand Up @@ -1439,6 +1447,18 @@ decompress_batch(RowDecompressor *decompressor)
&decompressor->detoaster,
CurrentMemoryContext));
CompressedDataHeader *header = get_compressed_data_header(compressed_datum);

/* Special compression block with the NULL compression algorithm,
* tells that all values in the compressed block are NULLs.
*/
if (header->compression_algorithm == COMPRESSION_ALGORITHM_NULL)
{
column_info->iterator = NULL;
decompressor->compressed_is_nulls[input_column] = true;
decompressor->decompressed_is_nulls[output_index] = true;
continue;
}

column_info->iterator =
definitions[header->compression_algorithm]
.iterator_init_forward(PointerGetDatum(header), column_info->decompressed_type);
Expand Down Expand Up @@ -1701,7 +1721,10 @@ tsl_compressed_data_send(PG_FUNCTION_ARGS)
pq_begintypsend(&buf);
pq_sendbyte(&buf, header->compression_algorithm);

definitions[header->compression_algorithm].compressed_data_send(header, &buf);
if (header->compression_algorithm != COMPRESSION_ALGORITHM_NULL)
{
definitions[header->compression_algorithm].compressed_data_send(header, &buf);
}

PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
}
Expand All @@ -1717,7 +1740,14 @@ tsl_compressed_data_recv(PG_FUNCTION_ARGS)
if (header.compression_algorithm >= _END_COMPRESSION_ALGORITHMS)
elog(ERROR, "invalid compression algorithm %d", header.compression_algorithm);

return definitions[header.compression_algorithm].compressed_data_recv(buf);
if (header.compression_algorithm == COMPRESSION_ALGORITHM_NULL)
{
PG_RETURN_NULL();

Check warning on line 1745 in tsl/src/compression/compression.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/compression/compression.c#L1745

Added line #L1745 was not covered by tests
}
else
{
return definitions[header.compression_algorithm].compressed_data_recv(buf);
}
}

extern Datum
Expand Down Expand Up @@ -1812,6 +1842,9 @@ tsl_compressed_data_info(PG_FUNCTION_ARGS)
case COMPRESSION_ALGORITHM_BOOL:
has_nulls = bool_compressed_has_nulls(header);
break;
case COMPRESSION_ALGORITHM_NULL:
has_nulls = true;
break;

Check warning on line 1847 in tsl/src/compression/compression.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/compression/compression.c#L1845-L1847

Added lines #L1845 - L1847 were not covered by tests
default:
elog(ERROR, "unknown compression algorithm %d", header->compression_algorithm);
break;
Expand Down Expand Up @@ -1853,6 +1886,9 @@ tsl_compressed_data_has_nulls(PG_FUNCTION_ARGS)
case COMPRESSION_ALGORITHM_BOOL:
has_nulls = bool_compressed_has_nulls(header);
break;
case COMPRESSION_ALGORITHM_NULL:
has_nulls = true;
break;
default:
elog(ERROR, "unknown compression algorithm %d", header->compression_algorithm);
break;
Expand Down
4 changes: 3 additions & 1 deletion tsl/src/compression/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ typedef enum CompressionAlgorithm
COMPRESSION_ALGORITHM_GORILLA,
COMPRESSION_ALGORITHM_DELTADELTA,
COMPRESSION_ALGORITHM_BOOL,
COMPRESSION_ALGORITHM_NULL,

/* When adding an algorithm also add a static assert statement below */
/* end of real values */
Expand Down Expand Up @@ -317,13 +318,14 @@ pg_attribute_unused() assert_num_compression_algorithms_sane(void)
StaticAssertStmt(COMPRESSION_ALGORITHM_GORILLA == 3, "algorithm index has changed");
StaticAssertStmt(COMPRESSION_ALGORITHM_DELTADELTA == 4, "algorithm index has changed");
StaticAssertStmt(COMPRESSION_ALGORITHM_BOOL == 5, "algorithm index has changed");
StaticAssertStmt(COMPRESSION_ALGORITHM_NULL == 6, "algorithm index has changed");

/*
* This should change when adding a new algorithm after adding the new
* algorithm to the assert list above. This statement prevents adding a
* new algorithm without updating the asserts above
*/
StaticAssertStmt(_END_COMPRESSION_ALGORITHMS == 6,
StaticAssertStmt(_END_COMPRESSION_ALGORITHMS == 7,
"number of algorithms have changed, the asserts should be updated");
}

Expand Down
8 changes: 8 additions & 0 deletions tsl/src/hypercore/arrow_array.c
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,14 @@ arrow_from_compressed(Datum compressed, Oid typid, MemoryContext dest_mcxt, Memo
*/
MemoryContext oldcxt = MemoryContextSwitchTo(tmp_mcxt);
const CompressedDataHeader *header = (CompressedDataHeader *) PG_DETOAST_DATUM(compressed);
if (header->compression_algorithm == COMPRESSION_ALGORITHM_NULL)
{
/*
* The NULL compression algorithm represents all NULL values.
*/
MemoryContextSwitchTo(oldcxt);
return NULL;
}
DecompressAllFunction decompress_all =
arrow_get_decompress_all(header->compression_algorithm, typid);

Expand Down
8 changes: 6 additions & 2 deletions tsl/src/hypercore/hypercore_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -2887,8 +2887,12 @@ hypercore_index_build_callback(Relation index, ItemPointer tid, Datum *values, b

/* The number of elements in the arrow array should be the
* same as the number of rows in the segment (count
* column). */
Assert(num_rows == icstate->arrow_columns[i]->length);
* column), except when we use the NULL compression method
* to signify all values are NULLs. In this case the
* arrow_column value is NULL.
*/
Assert(icstate->arrow_columns[i] == NULL ||
num_rows == icstate->arrow_columns[i]->length);
}
else
{
Expand Down
12 changes: 11 additions & 1 deletion tsl/src/nodes/decompress_chunk/compressed_batch.c
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,18 @@ decompress_column(DecompressContext *dcontext, DecompressBatchState *batch_state
&dcontext->detoaster,
batch_state->per_batch_context));

/* Decompress the entire batch if it is supported. */
CompressedDataHeader *header = (CompressedDataHeader *) value;

/* First check if this is a block of NULL values. */
if (header->compression_algorithm == COMPRESSION_ALGORITHM_NULL)
{
column_values->decompression_type = DT_Scalar;
*column_values->output_isnull = true;
*column_values->output_value = (Datum) NULL;
return;
}

/* Decompress the entire batch if it is supported. */
ArrowArray *arrow = NULL;
if (dcontext->enable_bulk_decompression && column_description->bulk_decompression_supported)
{
Expand Down
4 changes: 3 additions & 1 deletion tsl/test/expected/compression_ddl.out
Original file line number Diff line number Diff line change
Expand Up @@ -2653,8 +2653,10 @@ SELECT compress_chunk(show_chunks('test_notnull'));
_timescaledb_internal._hyper_46_237_chunk
(1 row)

-- broken atm due to bug in default handling in compression
\set ON_ERROR_STOP 0
ALTER TABLE test_notnull ALTER COLUMN c2 SET NOT NULL;
ERROR: column "c2" of relation "_hyper_46_237_chunk" contains null values
\set ON_ERROR_STOP 1
-- test alias in parameter name
CREATE TABLE alias(time timestamptz NOT NULL);
SELECT create_hypertable('alias','time');
Expand Down
Loading

0 comments on commit 899adf1

Please sign in to comment.