Skip to content

Commit

Permalink
Implement MatchingStats for SQL mode
Browse files Browse the repository at this point in the history
  • Loading branch information
jnsrnhld committed Jan 25, 2024
1 parent 867236f commit 650243e
Show file tree
Hide file tree
Showing 30 changed files with 1,002 additions and 311 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package com.bakdata.conquery.mode.cluster;

import java.util.HashMap;
import java.util.Map;

import com.bakdata.conquery.models.common.daterange.CDateRange;
import com.bakdata.conquery.models.datasets.Column;
import com.bakdata.conquery.models.datasets.Table;
import com.bakdata.conquery.models.datasets.concepts.MatchingStats;
import com.bakdata.conquery.models.events.Bucket;
import com.bakdata.conquery.models.identifiable.ids.specific.WorkerId;
import com.fasterxml.jackson.annotation.JsonIgnore;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.Setter;

@Setter
public class WorkerMatchingStats implements MatchingStats {

private Map<WorkerId, Entry> entries = new HashMap<>();

@JsonIgnore
private transient CDateRange span;

@JsonIgnore
private transient long numberOfEvents = -1L;

@JsonIgnore
private transient long numberOfEntities = -1L;

@Override
public long countEvents() {
if (numberOfEvents == -1L) {
synchronized (this) {
if (numberOfEvents == -1L) {
numberOfEvents = entries.values().stream().mapToLong(Entry::getNumberOfEvents).sum();
}
}
}
return numberOfEvents;
}


@Override
public long countEntities() {
if (numberOfEntities == -1L) {
synchronized (this) {
if (numberOfEntities == -1L) {
numberOfEntities = entries.values().stream().mapToLong(Entry::getNumberOfEntities).sum();
}
}
}
return numberOfEntities;
}

@Override
public CDateRange spanEvents() {
if (span == null) {
synchronized (this) {
if (span == null) {
span = entries.values().stream().map(Entry::getSpan).reduce(CDateRange.all(), CDateRange::spanClosed);
}
}
}
return span;

}

public void putEntry(WorkerId source, Entry entry) {
synchronized (this) {
entries.put(source, entry);
span = null;
numberOfEntities = -1L;
numberOfEvents = -1L;
}
}

@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Entry {
private long numberOfEvents;

@JsonIgnore
private final IntSet foundEntities = new IntOpenHashSet();
private long numberOfEntities;
private CDateRange span;


public void addEvent(Table table, Bucket bucket, int event, int entityForEvent) {
numberOfEvents++;
if (foundEntities.add(entityForEvent)) {
numberOfEntities++;
}

for (Column c : table.getColumns()) {
if (!c.getType().isDateCompatible()) {
continue;
}

if (!bucket.has(event, c)) {
continue;
}

final CDateRange time = bucket.getAsDateRange(event, c);
span = time.spanClosed(span);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.bakdata.conquery.sql.conversion.dialect.HanaSqlDialect;
import com.bakdata.conquery.sql.conversion.dialect.PostgreSqlDialect;
import com.bakdata.conquery.sql.conversion.dialect.SqlDialect;
import com.bakdata.conquery.sql.conversion.dialect.SqlFunctionProvider;
import com.bakdata.conquery.sql.execution.ResultSetProcessorFactory;
import com.bakdata.conquery.sql.execution.SqlExecutionService;
import io.dropwizard.setup.Environment;
Expand All @@ -35,14 +36,15 @@ public DelegateManager<LocalNamespace> provideManager(ConqueryConfig config, Env
SqlConnectorConfig sqlConnectorConfig = config.getSqlConnectorConfig();
DSLContext dslContext = DslContextFactory.create(sqlConnectorConfig);
SqlDialect sqlDialect = createSqlDialect(sqlConnectorConfig, dslContext);
SqlFunctionProvider functionProvider = sqlDialect.getFunctionProvider();
SqlContext sqlContext = new SqlContext(sqlConnectorConfig, sqlDialect);

SqlExecutionService sqlExecutionService = new SqlExecutionService(
sqlDialect.getDSLContext(),
ResultSetProcessorFactory.create(sqlDialect)
);

NamespaceHandler<LocalNamespace> namespaceHandler = new LocalNamespaceHandler(config, creator, sqlContext, sqlExecutionService);
NamespaceHandler<LocalNamespace> namespaceHandler = new LocalNamespaceHandler(config, creator, sqlContext, sqlExecutionService, functionProvider);
DatasetRegistry<LocalNamespace> datasetRegistry = ManagerProvider.createLocalDatasetRegistry(namespaceHandler, config, creator, sqlExecutionService);
creator.init(datasetRegistry);

Expand All @@ -51,7 +53,7 @@ public DelegateManager<LocalNamespace> provideManager(ConqueryConfig config, Env
environment,
datasetRegistry,
new FailingImportHandler(),
new LocalStorageListener(),
new LocalStorageListener(datasetRegistry),
EMPTY_NODE_PROVIDER,
List.of(),
creator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.bakdata.conquery.models.worker.LocalNamespace;
import com.bakdata.conquery.sql.SqlContext;
import com.bakdata.conquery.sql.conquery.SqlExecutionManager;
import com.bakdata.conquery.sql.conversion.dialect.SqlFunctionProvider;
import com.bakdata.conquery.sql.execution.SqlExecutionService;
import lombok.RequiredArgsConstructor;

Expand All @@ -22,6 +23,7 @@ public class LocalNamespaceHandler implements NamespaceHandler<LocalNamespace> {
private final InternalObjectMapperCreator mapperCreator;
private final SqlContext sqlContext;
private final SqlExecutionService sqlExecutionService;
private final SqlFunctionProvider functionProvider;

@Override
public LocalNamespace createNamespace(NamespaceStorage namespaceStorage, MetaStorage metaStorage, IndexService indexService) {
Expand All @@ -33,6 +35,7 @@ public LocalNamespace createNamespace(NamespaceStorage namespaceStorage, MetaSto
namespaceStorage,
executionManager,
sqlExecutionService,
functionProvider,
namespaceData.getJobManager(),
namespaceData.getFilterSearch(),
namespaceData.getIndexService(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
package com.bakdata.conquery.mode.local;

import java.util.Collection;

import com.bakdata.conquery.mode.StorageListener;
import com.bakdata.conquery.models.datasets.Dataset;
import com.bakdata.conquery.models.datasets.SecondaryIdDescription;
import com.bakdata.conquery.models.datasets.Table;
import com.bakdata.conquery.models.datasets.concepts.Concept;
import com.bakdata.conquery.models.jobs.SqlUpdateMatchingStatsJob;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.models.worker.LocalNamespace;
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
public class LocalStorageListener implements StorageListener {

// When running without shards, no further actions are required
private final DatasetRegistry<LocalNamespace> datasetRegistry;

@Override
public void onAddSecondaryId(SecondaryIdDescription secondaryId) {
Expand Down Expand Up @@ -36,5 +43,17 @@ public void onDeleteConcept(Concept<?> concept) {

@Override
public void onUpdateMatchingStats(Dataset dataset) {

final LocalNamespace namespace = datasetRegistry.get(dataset.getId());
final Collection<Concept<?>> concepts = namespace.getStorage().getAllConcepts();

SqlUpdateMatchingStatsJob matchingStatsJob = new SqlUpdateMatchingStatsJob(
datasetRegistry.getConfig().getSqlConnectorConfig(),
namespace.getSqlExecutionService(),
namespace.getFunctionProvider(),
concepts
);

datasetRegistry.get(dataset.getId()).getJobManager().addSlowJob(matchingStatsJob);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.bakdata.conquery.mode.local;

import com.bakdata.conquery.models.common.daterange.CDateRange;
import com.bakdata.conquery.models.datasets.concepts.MatchingStats;
import lombok.Value;

@Value
public class SqlMatchingStats implements MatchingStats {

long numberOfEvents;
long numberOfEntities;
CDateRange span;

@Override
public long countEvents() {
return numberOfEvents;
}

@Override
public long countEntities() {
return numberOfEntities;
}

@Override
public CDateRange spanEvents() {
return span;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
@AllArgsConstructor
public class SqlConnectorConfig {

public static final String DEFAULT_PRIMARY_COLUMN = "pid";

boolean enabled;

private Dialect dialect;
Expand All @@ -26,5 +28,5 @@ public class SqlConnectorConfig {

private String jdbcConnectionUrl;

private String primaryColumn = "pid";
private String primaryColumn = DEFAULT_PRIMARY_COLUMN;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Set;
import java.util.stream.Stream;

import javax.annotation.Nullable;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;

Expand Down Expand Up @@ -33,7 +34,12 @@ public class Table extends Labeled<TableId> implements NamespacedIdentifiable<Ta
@Valid
@JsonManagedReference
private Column[] columns = new Column[0];

/**
* Defines the primary key/column of this table. Only required for SQL mode.
*/
@Nullable
@JsonManagedReference
private Column primaryColum;

@ValidationMethod(message = "More than one column map to the same secondaryId")
@JsonIgnore
Expand Down
Original file line number Diff line number Diff line change
@@ -1,107 +1,16 @@
package com.bakdata.conquery.models.datasets.concepts;

import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;

import com.bakdata.conquery.models.common.daterange.CDateRange;
import com.bakdata.conquery.models.datasets.Column;
import com.bakdata.conquery.models.datasets.Table;
import com.bakdata.conquery.models.events.Bucket;
import com.bakdata.conquery.models.identifiable.ids.specific.WorkerId;
import com.fasterxml.jackson.annotation.JsonIgnore;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import lombok.*;

@Getter
@Setter
public class MatchingStats {
public interface MatchingStats {

private Map<WorkerId, Entry> entries = new HashMap<>();
@JsonIgnore
private transient CDateRange span;
long countEvents();

@JsonIgnore
private transient long numberOfEvents = -1L;
long countEntities();

@JsonIgnore
private transient long numberOfEntities = -1L;

public long countEvents() {
if (numberOfEvents == -1L) {
synchronized (this) {
if (numberOfEvents == -1L) {
numberOfEvents = entries.values().stream().mapToLong(Entry::getNumberOfEvents).sum();
}
}
}
return numberOfEvents;
}


public long countEntities() {
if (numberOfEntities == -1L) {
synchronized (this) {
if (numberOfEntities == -1L) {
numberOfEntities = entries.values().stream().mapToLong(Entry::getNumberOfEntities).sum();
}
}
}
return numberOfEntities;
}

public CDateRange spanEvents() {
if (span == null) {
synchronized (this) {
if (span == null) {
span = entries.values().stream().map(Entry::getSpan).reduce(CDateRange.all(), CDateRange::spanClosed);
}
}
}
return span;

}

public void putEntry(WorkerId source, Entry entry) {
synchronized (this) {
entries.put(source, entry);
span = null;
numberOfEntities = -1L;
numberOfEvents = -1L;
}
}

@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Entry {
private long numberOfEvents;

@JsonIgnore
private final IntSet foundEntities = new IntOpenHashSet();
private long numberOfEntities;
private CDateRange span;


public void addEvent(Table table, Bucket bucket, int event, int entityForEvent) {
numberOfEvents++;
if (foundEntities.add(entityForEvent)) {
numberOfEntities++;
}

for (Column c : table.getColumns()) {
if (!c.getType().isDateCompatible()) {
continue;
}

if (!bucket.has(event, c)) {
continue;
}

final CDateRange time = bucket.getAsDateRange(event, c);
span = time.spanClosed(span);
}
}
}
@Nullable
CDateRange spanEvents();

}
Loading

0 comments on commit 650243e

Please sign in to comment.