Skip to content
This repository has been archived by the owner on Nov 8, 2020. It is now read-only.

Commit

Permalink
postgresql的jsonb替换为json,提高插入性能
Browse files Browse the repository at this point in the history
  • Loading branch information
无名 committed Apr 12, 2019
1 parent b97c257 commit 4e8d214
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 16 deletions.
8 changes: 4 additions & 4 deletions src/Ray.PostgreSQL/Core/PSQLBuildService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public async Task CreateEventTable(EventSubTable subTable)
{stateIdSql},
UniqueId varchar(250) null,
TypeCode varchar(100) not null,
Data jsonb not null,
Data json not null,
Version int8 not null,
Timestamp int8 not null,
constraint {subTable.SubTable}_id_unique unique(StateId,TypeCode,UniqueId)
Expand Down Expand Up @@ -91,7 +91,7 @@ public async Task CreateEventArchiveTable()
{stateIdSql},
UniqueId varchar(250) null,
TypeCode varchar(100) not null,
Data jsonb not null,
Data json not null,
Version int8 not null,
Timestamp int8 not null,
constraint {storageOptions.EventArchiveTable}_id_unique unique(StateId,TypeCode,UniqueId)
Expand Down Expand Up @@ -130,7 +130,7 @@ Id varchar(50) not null PRIMARY KEY,
EndTimestamp int8 not null,
Index int4 not null,
EventIsCleared bool not null,
Data jsonb not null,
Data json not null,
IsOver bool not null,
Version int8 not null)WITH (OIDS=FALSE);
CREATE INDEX IF NOT EXISTS {storageOptions.SnapshotArchiveTable}_StateId ON {storageOptions.SnapshotArchiveTable} USING btree(StateId)";
Expand All @@ -146,7 +146,7 @@ public async Task CreateSnapshotTable()
var sql = $@"
CREATE TABLE if not exists {storageOptions.SnapshotTable}(
{stateIdSql},
Data jsonb not null,
Data json not null,
Version int8 not null,
StartTimestamp int8 not null,
LatestMinEventTimestamp int8 not null,
Expand Down
2 changes: 1 addition & 1 deletion src/Ray.PostgreSQL/Storage/ArchiveStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public ArchiveStorage(IServiceProvider serviceProvider, ISerializer serializer,
getByIdSql = $"select * FROM {tableName} where id=@Id";
getListByStateIdSql = $"select Id,StartVersion,EndVersion,StartTimestamp,EndTimestamp,Index,EventIsCleared FROM {tableName} where stateid=@StateId";
getLatestByStateIdSql = $"select Id,StartVersion,EndVersion,StartTimestamp,EndTimestamp,Index,EventIsCleared FROM {tableName} where stateid=@StateId order by index desc limit 1";
insertSql = $"INSERT into {tableName}(Id,stateid,StartVersion,EndVersion,StartTimestamp,EndTimestamp,Index,EventIsCleared,data,IsOver,Version)VALUES(@Id,@StateId,@StartVersion,@EndVersion,@StartTimestamp,@EndTimestamp,@Index,@EventIsCleared,(@Data)::jsonb,@IsOver,@Version)";
insertSql = $"INSERT into {tableName}(Id,stateid,StartVersion,EndVersion,StartTimestamp,EndTimestamp,Index,EventIsCleared,data,IsOver,Version)VALUES(@Id,@StateId,@StartVersion,@EndVersion,@StartTimestamp,@EndTimestamp,@Index,@EventIsCleared,(@Data)::json,@IsOver,@Version)";
updateOverSql = $"update {tableName} set IsOver=@IsOver where stateid=@StateId";
updateEventIsClearSql = $"update {tableName} set EventIsCleared=true where id=@Id";
}
Expand Down
6 changes: 3 additions & 3 deletions src/Ray.PostgreSQL/Storage/DistributedTxStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void CreateEventSubRecordTable()
CREATE TABLE if not exists {options.Value.TableName}(
UnitName varchar(500) not null,
TransactionId int8 not null,
Data jsonb not null,
Data json not null,
Status int2 not null)WITH (OIDS=FALSE);
CREATE UNIQUE INDEX IF NOT EXISTS UnitName_TransId ON {options.Value.TableName} USING btree(UnitName, TransactionId)";
using (var connection = CreateConnection())
Expand Down Expand Up @@ -117,7 +117,7 @@ private async Task BatchProcessing(List<AsyncInputEvent<AppendInput, bool>> wrap
writer.StartRow();
writer.Write(wrapper.Value.UnitName, NpgsqlDbType.Varchar);
writer.Write(wrapper.Value.TransactionId, NpgsqlDbType.Bigint);
writer.Write(wrapper.Value.Data, NpgsqlDbType.Jsonb);
writer.Write(wrapper.Value.Data, NpgsqlDbType.Json);
writer.Write((short)wrapper.Value.Status, NpgsqlDbType.Smallint);
}
writer.Complete();
Expand All @@ -127,7 +127,7 @@ private async Task BatchProcessing(List<AsyncInputEvent<AppendInput, bool>> wrap
}
catch
{
var saveSql = $"INSERT INTO {options.Value.TableName}(UnitName,TransactionId,Data,Status) VALUES(@UnitName,@TransactionId,(@Data)::jsonb,@Status) ON CONFLICT ON CONSTRAINT UnitName_TransId DO NOTHING";
var saveSql = $"INSERT INTO {options.Value.TableName}(UnitName,TransactionId,Data,Status) VALUES(@UnitName,@TransactionId,(@Data)::json,@Status) ON CONFLICT ON CONSTRAINT UnitName_TransId DO NOTHING";
using (var conn = CreateConnection())
{
await conn.OpenAsync();
Expand Down
12 changes: 6 additions & 6 deletions src/Ray.PostgreSQL/Storage/EventStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ await Task.Run(async () =>
while (reader.StartRow() != -1)
{
var typeCode = reader.Read<string>(NpgsqlDbType.Varchar);
var data = reader.Read<string>(NpgsqlDbType.Jsonb);
var data = reader.Read<string>(NpgsqlDbType.Json);
var version = reader.Read<long>(NpgsqlDbType.Bigint);
var timestamp = reader.Read<long>(NpgsqlDbType.Bigint);
if (version <= endVersion && version >= startVersion)
Expand Down Expand Up @@ -93,7 +93,7 @@ await Task.Run(async () =>
{
while (reader.StartRow() != -1)
{
var data = reader.Read<string>(NpgsqlDbType.Jsonb);
var data = reader.Read<string>(NpgsqlDbType.Json);
var version = reader.Read<long>(NpgsqlDbType.Bigint);
var timestamp = reader.Read<long>(NpgsqlDbType.Bigint);
if (version >= startVersion && serializer.Deserialize(type, Encoding.Default.GetBytes(data)) is IEvent evt)
Expand Down Expand Up @@ -170,7 +170,7 @@ async Task BatchCopy(string tableName, List<AsyncInputEvent<BatchAppendTransport
writer.Write(wrapper.Value.Event.StateId);
writer.Write(wrapper.Value.UniqueId, NpgsqlDbType.Varchar);
writer.Write(wrapper.Value.Event.Event.GetType().FullName, NpgsqlDbType.Varchar);
writer.Write(Encoding.Default.GetString(wrapper.Value.BytesTransport.EventBytes), NpgsqlDbType.Jsonb);
writer.Write(Encoding.Default.GetString(wrapper.Value.BytesTransport.EventBytes), NpgsqlDbType.Json);
writer.Write(wrapper.Value.Event.Base.Version, NpgsqlDbType.Bigint);
writer.Write(wrapper.Value.Event.Base.Timestamp, NpgsqlDbType.Bigint);
}
Expand All @@ -183,7 +183,7 @@ async Task BatchCopy(string tableName, List<AsyncInputEvent<BatchAppendTransport
{
logger.LogError(ex, ex.Message);
var saveSql = saveSqlDict.GetOrAdd(tableName,
key => $"INSERT INTO {key}(stateid,uniqueId,typecode,data,version,timestamp) VALUES(@StateId,@UniqueId,@TypeCode,(@Data)::jsonb,@Version,@Timestamp) ON CONFLICT ON CONSTRAINT {key}_id_unique DO NOTHING");
key => $"INSERT INTO {key}(stateid,uniqueId,typecode,data,version,timestamp) VALUES(@StateId,@UniqueId,@TypeCode,(@Data)::json,@Version,@Timestamp) ON CONFLICT ON CONSTRAINT {key}_id_unique DO NOTHING");
await BatchInsert(saveSql, wrapperList);
}
}
Expand Down Expand Up @@ -268,7 +268,7 @@ await Task.Run(async () =>
writer.Write(wrapper.FullyEvent.StateId);
writer.Write(wrapper.UniqueId, NpgsqlDbType.Varchar);
writer.Write(wrapper.FullyEvent.Event.GetType().FullName, NpgsqlDbType.Varchar);
writer.Write(Encoding.Default.GetString(wrapper.BytesTransport.EventBytes), NpgsqlDbType.Jsonb);
writer.Write(Encoding.Default.GetString(wrapper.BytesTransport.EventBytes), NpgsqlDbType.Json);
writer.Write(wrapper.FullyEvent.Base.Version, NpgsqlDbType.Bigint);
writer.Write(wrapper.FullyEvent.Base.Timestamp, NpgsqlDbType.Bigint);
}
Expand Down Expand Up @@ -296,7 +296,7 @@ await Task.Run(async () =>
foreach (var group in groups)
{
var saveSql = saveSqlDict.GetOrAdd(group.Key,
key => $"INSERT INTO {key}(stateid,uniqueId,typecode,data,version,timestamp) VALUES(@StateId,@UniqueId,@TypeCode,(@Data)::jsonb,@Version,@Timestamp)");
key => $"INSERT INTO {key}(stateid,uniqueId,typecode,data,version,timestamp) VALUES(@StateId,@UniqueId,@TypeCode,(@Data)::json,@Version,@Timestamp)");
await conn.ExecuteAsync(saveSql, group.Select(g => new
{
g.t.FullyEvent.StateId,
Expand Down
4 changes: 2 additions & 2 deletions src/Ray.PostgreSQL/Storage/SnapshotStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ public SnapshotStorage(ISerializer serializer, StorageOptions config)
this.config = config;
deleteSql = $"DELETE FROM {this.config.SnapshotTable} where stateid=@StateId";
getByIdSql = $"select * FROM {this.config.SnapshotTable} where stateid=@StateId";
insertSql = $"INSERT into {this.config.SnapshotTable}(stateid,data,version,StartTimestamp,LatestMinEventTimestamp,IsLatest,IsOver)VALUES(@StateId,(@Data)::jsonb,@Version,@StartTimestamp,@LatestMinEventTimestamp,@IsLatest,@IsOver)";
updateSql = $"update {this.config.SnapshotTable} set data=(@Data)::jsonb,version=@Version,LatestMinEventTimestamp=@LatestMinEventTimestamp,IsLatest=@IsLatest,IsOver=@IsOver where stateid=@StateId";
insertSql = $"INSERT into {this.config.SnapshotTable}(stateid,data,version,StartTimestamp,LatestMinEventTimestamp,IsLatest,IsOver)VALUES(@StateId,(@Data)::json,@Version,@StartTimestamp,@LatestMinEventTimestamp,@IsLatest,@IsOver)";
updateSql = $"update {this.config.SnapshotTable} set data=(@Data)::json,version=@Version,LatestMinEventTimestamp=@LatestMinEventTimestamp,IsLatest=@IsLatest,IsOver=@IsOver where stateid=@StateId";
updateOverSql = $"update {this.config.SnapshotTable} set IsOver=@IsOver where stateid=@StateId";
updateIsLatestSql = $"update {this.config.SnapshotTable} set IsLatest=@IsLatest where stateid=@StateId";
updateLatestTimestampSql = $"update {this.config.SnapshotTable} set LatestMinEventTimestamp=@LatestMinEventTimestamp where stateid=@StateId";
Expand Down

0 comments on commit 4e8d214

Please sign in to comment.