diff --git a/.github/config/muted_ya.txt b/.github/config/muted_ya.txt index 886d737cf588..c90bfe070531 100644 --- a/.github/config/muted_ya.txt +++ b/.github/config/muted_ya.txt @@ -24,7 +24,6 @@ ydb/core/keyvalue/ut_trace TKeyValueTracingTest.WriteHuge ydb/core/keyvalue/ut_trace TKeyValueTracingTest.WriteSmall ydb/core/kqp/ut/cost KqpCost.OlapWriteRow ydb/core/kqp/ut/data_integrity KqpDataIntegrityTrails.Select -ydb/core/kqp/ut/data_integrity KqpDataIntegrityTrails.UpsertEvWrite ydb/core/kqp/ut/olap KqpDecimalColumnShard.TestAggregation ydb/core/kqp/ut/olap KqpDecimalColumnShard.TestFilterCompare ydb/core/kqp/ut/olap KqpOlap.ManyColumnShardsWithRestarts diff --git a/ydb/core/data_integrity_trails/data_integrity_trails.h b/ydb/core/data_integrity_trails/data_integrity_trails.h index 1ec566048a51..fba1bc73ec1e 100644 --- a/ydb/core/data_integrity_trails/data_integrity_trails.h +++ b/ydb/core/data_integrity_trails/data_integrity_trails.h @@ -7,7 +7,7 @@ namespace NKikimr { namespace NDataIntegrity { -inline void LogKeyValue(const TString& key, const TString& value, TStringStream& ss, bool last = false) { +inline void LogKeyValue(const TStringBuf key, const TStringBuf value, TStringStream& ss, bool last = false) { ss << key << ": " << (value.empty() ? "Empty" : value) << (last ? "" : ","); } diff --git a/ydb/core/kqp/common/kqp_data_integrity_trails.h b/ydb/core/kqp/common/kqp_data_integrity_trails.h index 37c73bc4eff7..71198204a55b 100644 --- a/ydb/core/kqp/common/kqp_data_integrity_trails.h +++ b/ydb/core/kqp/common/kqp_data_integrity_trails.h @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -115,5 +116,24 @@ inline void LogIntegrityTrails(const TString& txType, const TString& traceId, ui LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(txType, traceId, txId, shardId)); } +// WriteActor,BufferActor +inline void LogIntegrityTrails(const TString& txType, ui64 txId, TMaybe shardId, const TActorContext& ctx, const TStringBuf component) { + auto log = [](const auto& type, const auto& txId, const auto& shardId, const auto component) { + TStringStream ss; + LogKeyValue("Component", component, ss); + LogKeyValue("PhyTxId", ToString(txId), ss); + + if (shardId) { + LogKeyValue("ShardId", ToString(*shardId), ss); + } + + LogKeyValue("Type", type, ss, /*last*/ true); + + return ss.Str(); + }; + + LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(txType, txId, shardId, component)); +} + } } diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index 593facb5704f..82f4fad93b78 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -888,6 +889,8 @@ class TKqpTableWriteActor : public TActorBootstrapped { Counters->WriteActorImmediateWritesRetries->Inc(); } + NDataIntegrity::LogIntegrityTrails("EvWriteTx", evWrite->Record.GetTxId(), shardId, TlsActivationContext->AsActorContext(), "WriteActor"); + CA_LOG_D("Send EvWrite to ShardID=" << shardId << ", isPrepare=" << isPrepare << ", isImmediateCommit=" << isImmediateCommit << ", TxId=" << evWrite->Record.GetTxId() << ", LockTxId=" << evWrite->Record.GetLockTxId() << ", LockNodeId=" << evWrite->Record.GetLockNodeId() << ", Locks= " << [&]() { @@ -1714,6 +1717,8 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub FillEvWritePrepare(evWrite.get(), shardId, *TxId, TxManager); } + NDataIntegrity::LogIntegrityTrails("EvWriteTx", evWrite->Record.GetTxId(), shardId, TlsActivationContext->AsActorContext(), "BufferActor"); + SendTime[shardId] = TInstant::Now(); CA_LOG_D("Send EvWrite (external) to ShardID=" << shardId << ", isPrepare=" << !isRollback << ", isImmediateCommit=" << isRollback << ", TxId=" << evWrite->Record.GetTxId() << ", LockTxId=" << evWrite->Record.GetLockTxId() << ", LockNodeId=" << evWrite->Record.GetLockNodeId() @@ -1733,7 +1738,8 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub NKikimr::MakePipePerNodeCacheID(false), new TEvPipeCache::TEvForward(evWrite.release(), shardId, /* subscribe */ true), IEventHandle::FlagTrackDelivery, - 0); + 0, + BufferWriteActor.GetTraceId()); } } @@ -1816,7 +1822,8 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub item.SetFlags(shardInfo.AffectedFlags); } - //TODO: NDataIntegrity + NDataIntegrity::LogIntegrityTrails("PlannedTx", *TxId, {}, TlsActivationContext->AsActorContext(), "BufferActor"); + CA_LOG_D("Execute planned transaction, coordinator: " << commitInfo.Coordinator << ", volitale: " << ((transaction.GetFlags() & TEvTxProxy::TEvProposeTransaction::FlagVolatile) != 0) << ", shards: " << affectedSet.size()); diff --git a/ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp b/ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp index 1642e9b117c7..225d797e8668 100644 --- a/ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp +++ b/ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp @@ -72,14 +72,79 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) { )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); - // check executer logs - UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 2); + // check write actor logs + UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: WriteActor"), 1); // check session actor logs UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), 2); // check grpc logs UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY TRACE: Component: Grpc"), 2); // check datashard logs - UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: DataShard"), 4); + UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: DataShard"), 2); + } + + Y_UNIT_TEST_TWIN(UpsertEvWriteQueryService, isOlap) { + NKikimrConfig::TAppConfig AppConfig; + if (!isOlap) { + AppConfig.MutableTableServiceConfig()->SetEnableOltpSink(true); + } else { + AppConfig.MutableTableServiceConfig()->SetEnableOlapSink(true); + } + TKikimrSettings serverSettings = TKikimrSettings().SetAppConfig(AppConfig); + TStringStream ss; + serverSettings.LogStream = &ss; + TKikimrRunner kikimr(serverSettings); + kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::DATA_INTEGRITY, NLog::PRI_TRACE); + + auto db = kikimr.GetQueryClient(); + auto session = db.GetSession().GetValueSync().GetSession(); + + { + const TString query = Sprintf(R"( + CREATE TABLE `/Root/test_evwrite` ( + Key Int64 NOT NULL, + Value String, + primary key (Key) + ) WITH (STORE=%s); + )", isOlap ? "COLUMN" : "ROW"); + + auto result = session.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + ss.Clear(); + + auto result = session.ExecuteQuery(R"( + --!syntax_v1 + + UPSERT INTO `/Root/test_evwrite` (Key, Value) VALUES + (3u, "Value3"), + (101u, "Value101"), + (201u, "Value201"); + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + if (!isOlap) { + // check write actor logs + UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: WriteActor"), 1); + // check session actor logs + UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), 2); + // check grpc logs + UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY TRACE: Component: Grpc"), 2); + // check datashard logs + UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: DataShard"), 2); + } else { + // check write actor logs + UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: WriteActor"), 3); + // check executer logs + UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 4); + // check session actor logs + UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), 2); + // check grpc logs + UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY TRACE: Component: Grpc"), 2); + // check columnshard logs + // ColumnShard doesn't have integrity logs. + // UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: ColumnShard"), 6); + } } Y_UNIT_TEST(Ddl) {