Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use caffeine in CachedStore and adapt ManagedInternalForm for state p… #3607

Open
wants to merge 35 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
8f25433
use caffeine in CachedStore and adapt ManagedInternalForm for state p…
thoniTUB Oct 21, 2024
0b96b94
Fix serdes problem in LocalAuthenticationRealm
thoniTUB Oct 22, 2024
07e810d
adds test for CachedStore
thoniTUB Oct 22, 2024
bcf05f6
fix race condition in ManagedQuery#lastResultCount
thoniTUB Oct 22, 2024
e283fcd
adds documentation
thoniTUB Oct 22, 2024
16c4cbb
pass config on construction not on init
thoniTUB Oct 22, 2024
0ca6358
fix cache invalidation on value update
thoniTUB Oct 24, 2024
313ed5c
review changes and fixing the add behaviour
thoniTUB Oct 24, 2024
8425e58
adds metrics
thoniTUB Oct 24, 2024
db7a03f
fix double metrics recorder
thoniTUB Oct 24, 2024
26cb189
limit sending of RegisterColumnValues jobs with semaphore
thoniTUB Oct 24, 2024
4037f5c
makes storages load on start configurable
thoniTUB Nov 7, 2024
c161294
adds task to load specific storages
thoniTUB Nov 8, 2024
5532a0a
Merge branch 'develop' into feature/use-caffeine-cache
thoniTUB Nov 8, 2024
ad52330
fix serdes issue with NonPersistentStoreFactory
thoniTUB Nov 13, 2024
75276ab
Merge branch 'refs/heads/develop' into feature/use-caffeine-cache
thoniTUB Nov 13, 2024
efdd9ac
Merge remote-tracking branch 'origin/develop' into feature/use-caffei…
thoniTUB Dec 2, 2024
1631fdf
adds logging for evicted entries
thoniTUB Dec 2, 2024
c2227d4
Merge branch 'develop' into feature/use-caffeine-cache
thoniTUB Dec 4, 2024
53e8d35
use parallel deserialization in BucketManager
thoniTUB Dec 10, 2024
5e6ade9
switch back to serial registration of buckets and cblocks
thoniTUB Dec 10, 2024
aad87d9
fix missing Owner relation to ManagedExecutionId and FormConfigId
thoniTUB Dec 10, 2024
a541b79
move direct storage access out of loop
thoniTUB Dec 10, 2024
553e954
revert using id in delÃete path because it is missing the meta storage
thoniTUB Dec 10, 2024
2d32249
revert using id in delete path because it is missing the meta storage
thoniTUB Dec 10, 2024
6a8e16b
adds logs
thoniTUB Dec 11, 2024
21cfd8e
improve performance for consistency
thoniTUB Dec 11, 2024
77af16d
adds logging for cache misses
thoniTUB Dec 13, 2024
64244ea
improve query plan creation by not resolving buckets
thoniTUB Dec 13, 2024
3a7f482
improve BucketManager creation time by only loading CBlocks
thoniTUB Dec 13, 2024
d674c77
allow caching of xodus blobs
thoniTUB Dec 16, 2024
a7f8f7a
Merge branch 'develop' into feature/use-caffeine-cache
thoniTUB Dec 16, 2024
067bb73
fix default option and log
thoniTUB Dec 17, 2024
9dd290e
fix unfinished transaction finish
thoniTUB Dec 17, 2024
5190da1
renew transactions
thoniTUB Dec 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -369,9 +369,12 @@ public FullExecutionStatus getSingleEntityExport(Subject subject, UriBuilder uri
throw new ConqueryError.ExecutionProcessingError();
}

// Workaround update our execution as the lastresultcount was set in the background
final EntityPreviewExecution executionFinished = (EntityPreviewExecution) execution.getId().resolve();
executionFinished.initExecutable(config);

final FullExecutionStatus status = execution.buildStatusFull(subject, namespace);
status.setResultUrls(getResultAssets(config.getResultProviders(), execution, uriBuilder, false));
status.setResultUrls(getResultAssets(config.getResultProviders(), executionFinished, uriBuilder, false));
return status;
}

Expand Down Expand Up @@ -571,7 +574,7 @@ public Stream<Map<String, String>> resolveEntities(Subject subject, List<FilterV
.filter(Predicate.not(Map::isEmpty));
}

public ResultStatistics getResultStatistics(SingleTableResult managedQuery) {
public <E extends ManagedExecution & SingleTableResult> ResultStatistics getResultStatistics(E managedQuery) {

final Locale locale = I18n.LOCALE.get();
final NumberFormat decimalFormat = NumberFormat.getNumberInstance(locale);
Expand All @@ -584,6 +587,8 @@ public ResultStatistics getResultStatistics(SingleTableResult managedQuery) {
new PrintSettings(true, locale, managedQuery.getNamespace(), config, null, null, decimalFormat, integerFormat);
final UniqueNamer uniqueNamer = new UniqueNamer(printSettings);

managedQuery.initExecutable(config);
thoniTUB marked this conversation as resolved.
Show resolved Hide resolved

final List<ResultInfo> resultInfos = managedQuery.getResultInfos();

final Optional<ResultInfo>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,9 @@
import java.util.stream.Stream;

import com.bakdata.conquery.io.storage.MetaStorage;
import com.bakdata.conquery.models.execution.ExecutionState;
import com.bakdata.conquery.models.execution.ManagedExecution;
import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId;
import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId;
import com.bakdata.conquery.models.identifiable.ids.specific.UserId;
import com.bakdata.conquery.models.query.ExecutionManager;
import com.bakdata.conquery.models.query.ManagedQuery;
import com.bakdata.conquery.models.query.QueryPlanContext;
import com.bakdata.conquery.models.query.QueryResolveContext;
Expand All @@ -27,8 +24,6 @@ public abstract class Query implements QueryDescription {

public abstract QueryPlan<?> createQueryPlan(QueryPlanContext context);

public abstract void collectRequiredQueries(Set<ManagedExecutionId> requiredQueries);

@Override
public abstract void resolve(QueryResolveContext context);

Expand All @@ -38,6 +33,8 @@ public Set<ManagedExecutionId> collectRequiredQueries() {
return set;
}

public abstract void collectRequiredQueries(Set<ManagedExecutionId> requiredQueries);

@JsonIgnore
public abstract List<ResultInfo> getResultInfos();

Expand All @@ -59,7 +56,6 @@ public CQElement getReusableComponents() {
*
* @param results
* @return the number of results in the result List.
* @see ManagedExecution#finish(ExecutionState, ExecutionManager) for how it's used.
*/
public long countResults(Stream<EntityResult> results) {
return results.map(EntityResult::listResultLines)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public <E extends ManagedExecution & SingleTableResult> Response createResult(Su
// Check if subject is permitted to download on all datasets that were referenced by the query
authorizeDownloadDatasets(subject, exec);

// Initialize execution so columns can be correctly accounted
exec.initExecutable(config);

final IdPrinter idPrinter = IdColumnUtil.getIdPrinter(subject, exec, namespace, config.getIdColumns().getIds());

// Get the locale extracted by the LocaleFilter
Expand Down
Original file line number Diff line number Diff line change
@@ -1,71 +1,73 @@
package com.bakdata.conquery.io.storage.xodus.stores;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Stream;

import com.bakdata.conquery.io.jackson.serializer.IdReferenceResolvingException;
import com.bakdata.conquery.io.storage.Store;
import com.bakdata.conquery.io.storage.xodus.stores.SerializingStore.IterationStatistic;
import com.bakdata.conquery.util.io.ProgressBar;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.CaffeineSpec;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.base.Stopwatch;
import com.jakewharton.byteunits.BinaryByteUnit;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;

@RequiredArgsConstructor
@Slf4j
@ToString(onlyExplicitlyIncluded = true)
public class CachedStore<KEY, VALUE> implements Store<KEY, VALUE> {

private static final ProgressBar PROGRESS_BAR = new ProgressBar(0, System.out);

private ConcurrentHashMap<KEY, VALUE> cache = new ConcurrentHashMap<>();
private final LoadingCache<KEY, VALUE> cache;
@ToString.Include
private final Store<KEY, VALUE> store;

public CachedStore(Store<KEY, VALUE> store, CaffeineSpec caffeineSpec) {
this.store = store;
cache = Caffeine.from(caffeineSpec)
// .recordStats(() -> new MetricsStatsCounter(metricRegistry, "cache."+store.toString()))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Metrics hinzufügen

.build(this.store::get);
}

@Override
public void add(KEY key, VALUE value) {
if (cache.putIfAbsent(key, value) != null) {
throw new IllegalStateException("The id " + key + " is already part of this store");
}
store.add(key, value);
// We don't distinguish between add and update on this layer. Let a deeper layer complain
thoniTUB marked this conversation as resolved.
Show resolved Hide resolved
update(key, value);
}

@Override
public VALUE get(KEY key) {
return cache.computeIfAbsent(key, store::get);
public void update(KEY key, VALUE value) {
store.update(key, value);
cache.invalidate(key);
}

@Override
public IterationStatistic forEach(StoreEntryConsumer<KEY, VALUE> consumer) {
return store.forEach(consumer);
public VALUE get(KEY key) {
return cache.get(key);
}

@Override
public void update(KEY key, VALUE value) {
cache.put(key, value);
store.update(key, value);
public IterationStatistic forEach(StoreEntryConsumer<KEY, VALUE> consumer) {
store.getAllKeys().forEach( k -> consumer.accept(k, cache.get(k), 0 /*Leaky?*/));
return null;
}

@Override
public void remove(KEY key) {
cache.remove(key);
store.remove(key);
}

@Override
public int count() {
if (cache.isEmpty()) {
return store.count();
}
return cache.size();
cache.invalidate(key);
}

@Override
public void loadData() {
final LongAdder totalSize = new LongAdder();
final int count = count();
cache = new ConcurrentHashMap<>(count);
final ProgressBar bar;

if (count > 100) {
Expand Down Expand Up @@ -108,36 +110,37 @@ public void loadData() {
});
log.debug("\tloaded store {}: {} entries, {} within {}",
this,
cache.values().size(),
count,
BinaryByteUnit.format(totalSize.sum()),
timer.stop()
);
}

@Override
public Stream<VALUE> getAll() {
return cache.values().stream();
public int count() {
return store.count();
}

@Override
public String toString() {
return "cached " + store.toString();
public Stream<VALUE> getAll() {
return store.getAllKeys().map(cache::get);
}

@Override
public Stream<KEY> getAllKeys() {
return cache.keySet().stream();
return store.getAllKeys();
}

@Override
public void clear() {
cache.clear();
store.clear();
cache.invalidateAll();
}

@Override
public void removeStore() {
store.removeStore();
cache.invalidateAll();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import jetbrains.exodus.env.Store;
import jetbrains.exodus.env.StoreConfig;
import lombok.Getter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;

@Slf4j
Expand All @@ -25,6 +26,7 @@ public class XodusStore {
private final Consumer<XodusStore> storeCloseHook;
private final Consumer<XodusStore> storeRemoveHook;
@Getter
@ToString.Include
thoniTUB marked this conversation as resolved.
Show resolved Hide resolved
private final String name;

public XodusStore(Environment env, String name, Consumer<XodusStore> storeCloseHook, Consumer<XodusStore> storeRemoveHook) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.bakdata.conquery.models.identifiable.ids.specific.UserId;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.benmanes.caffeine.cache.CaffeineSpec;
import com.google.common.collect.ImmutableList;
import com.password4j.HashingFunction;
import com.password4j.Password;
Expand Down Expand Up @@ -66,19 +67,16 @@ public class LocalAuthenticationRealm extends AuthenticatingRealm implements Con

private final XodusConfig passwordStoreConfig;
private final String storeName;

@JsonIgnore
private Environment passwordEnvironment;
@JsonIgnore
private Store<UserId, HashEntry> passwordStore;

@JsonIgnore
private final ConqueryTokenRealm centralTokenRealm;
private final Duration validDuration;
private final Validator validator;
private final ObjectMapper mapper;

private final HashingFunction defaultHashingFunction;
@JsonIgnore
private Environment passwordEnvironment;
@JsonIgnore
private Store<UserId, HashEntry> passwordStore;

//////////////////// INITIALIZATION ////////////////////

Expand Down Expand Up @@ -116,7 +114,9 @@ protected void onInit() {
false,
true,
null, Executors.newSingleThreadExecutor()
));
),
CaffeineSpec.parse("") // TODO add configuration
thoniTUB marked this conversation as resolved.
Show resolved Hide resolved
);
}

//////////////////// AUTHENTICATION ////////////////////
Expand Down Expand Up @@ -145,14 +145,31 @@ public String createAccessToken(String username, String password) {
throw new CredentialsException("No password hash was found for user: " + username);
}

final String hash = hashedEntry.getHash();
final String hash = hashedEntry.hash();
if (!Password.check(password.getBytes(), hash.getBytes()).with(PasswordHelper.getHashingFunction(hash))) {
throw new IncorrectCredentialsException("Password was was invalid for user: " + userId);
}

return centralTokenRealm.createTokenForUser(userId, validDuration);
}

@Override
public boolean addUser(@NonNull User user, @NonNull CredentialType credential) {

try {
final HashEntry hashEntry = toHashEntry(credential);
passwordStore.add(user.getId(), hashEntry);
log.debug("Added user to realm: {}", user.getId());
return true;
}
catch (IllegalArgumentException e) {
log.warn("Unable to add user '{}'", user.getId(), e);
}
return false;
}

//////////////////// USER MANAGEMENT ////////////////////

/**
* Converts the provided password to a Xodus compatible hash.
*/
Expand All @@ -171,22 +188,6 @@ else if (credential instanceof PasswordHashCredential passwordHashCredential) {
throw new IllegalArgumentException("CredentialType not supported yet: " + credential.getClass());
}

//////////////////// USER MANAGEMENT ////////////////////

@Override
public boolean addUser(@NonNull User user, @NonNull CredentialType credential) {

try {
final HashEntry hashEntry = toHashEntry(credential);
passwordStore.add(user.getId(), hashEntry);
return true;
}
catch (IllegalArgumentException e) {
log.warn("Unable to add user '{}'", user.getId(), e);
}
return false;
}

@Override
public boolean updateUser(User user, CredentialType credential) {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.bakdata.conquery.models.auth.basic;

import lombok.Data;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -13,12 +12,10 @@
@Slf4j
public class PasswordHasher {

@Data
/**
* Container class for the entries in the store consisting of the salted password hash and the corresponding salt.
*/
public static class HashEntry {
final String hash;
public record HashEntry(String hash) {
}

}
Loading
Loading