Skip to content

Commit

Permalink
loading of modules in child process
Browse files Browse the repository at this point in the history
- config file(s) are scanned for "loadmodule" directives
  to have modules loaded again in child process, so they can
  properly serialize their data

Fixes #130
  • Loading branch information
tporadowski committed Feb 10, 2022
1 parent b80b1be commit 223d9e1
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 4 deletions.
45 changes: 44 additions & 1 deletion src/Win32_Interop/Win32_QFork_impl.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,55 @@
#include "..\server.h"
#include "Win32_Portability.h"

/*
* Config file parameter filter that allows only "loadmodule" directives.
*/
int ConfigFilterModulesOnly(sds configLine) {
return strcasecmp(configLine, "loadmodule") == 0 ? TRUE : FALSE;
}

/*
* Call to "SetupRedisGlobals" copies over "server" structure from the main process, but here
* we are initializing parts of the application to allow modules to be loaded from config file,
* so they can take part in asynchronous saves. For that we're clearing "server.commands",
* "server.orig_commands", and then only take "loadmodule" directives from main process' config
* file. This function consists of parts from server.c/main() function.
*/
void InitRedisModulesFromConfigFile() {
#pragma warning( suppress : 6031 )
pthread_mutex_init(&lazyfree_objects_mutex, NULL);
#pragma warning( suppress : 6031 )
pthread_mutex_init(&moduleUnblockedClientsMutex, NULL);
#pragma warning( suppress : 6031 )
pthread_mutex_init(&moduleGIL, NULL);
moduleInitModulesSystem();

//re-create commands as the dictionary coming from main process contains commands
// registered by modules, which would prevent them from loading in this child process
server.commands = dictCreate(server.commands->type, NULL);
server.orig_commands = dictCreate(server.orig_commands->type, NULL);
populateCommandTable();

//read only "loadmodule" directives from config file(s)
loadServerConfig(server.configfile, NULL, &ConfigFilterModulesOnly);

//load modules found in config file(s)
moduleLoadFromQueue();
}

void SetupRedisGlobals(LPVOID redisData, size_t redisDataSize, uint8_t *dictHashSeed, LPVOID redisModules)
{
#ifndef NO_QFORKIMPL
memcpy(&server, redisData, redisDataSize);
dictSetHashFunctionSeed(dictHashSeed);
modules = (dict*) redisModules;

//WORKAROUND: if main process was started with a config file - it contains modules
// to be loaded in this child process, so load them here as well
if (server.configfile) {
InitRedisModulesFromConfigFile();
} else {
modules = (dict*)redisModules;
}
#endif
}

Expand Down
27 changes: 25 additions & 2 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,11 @@ void queueLoadModule(sds path, sds *argv, int argc) {
listAddNodeTail(server.loadmodule_queue,loadmod);
}

void loadServerConfigFromString(char *config) {
#ifdef _WIN32
void loadServerConfigFromString(char *config, configLineFilter lineFilter) {
#else
void loadServerConfigFromString(char* config) {
#endif
char *err = NULL;
int linenum = 0, totlines, i;
int slaveof_linenum = 0;
Expand Down Expand Up @@ -210,6 +214,13 @@ void loadServerConfigFromString(char *config) {
}
sdstolower(argv[0]);

#ifdef _WIN32
//skip config parameter if "line filter" returns FALSE
if (lineFilter && lineFilter(argv[0]) == FALSE) {
continue;
}
#endif

/* Execute config directives */
if (!strcasecmp(argv[0],"timeout") && argc == 2) {
server.maxidletime = atoi(argv[1]);
Expand Down Expand Up @@ -353,7 +364,11 @@ void loadServerConfigFromString(char *config) {
err = "Invalid number of databases"; goto loaderr;
}
} else if (!strcasecmp(argv[0],"include") && argc == 2) {
#ifdef _WIN32
loadServerConfig(argv[1],NULL,lineFilter);
#else
loadServerConfig(argv[1],NULL);
#endif
} else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
server.maxclients = atoi(argv[1]);
if (server.maxclients < 1) {
Expand Down Expand Up @@ -922,7 +937,11 @@ void loadServerConfigFromString(char *config) {
* Both filename and options can be NULL, in such a case are considered
* empty. This way loadServerConfig can be used to just load a file or
* just load a string. */
void loadServerConfig(char *filename, char *options) {
#ifdef _WIN32
void loadServerConfig(char *filename, char *options, configLineFilter lineFilter) {
#else
void loadServerConfig(char* filename, char* options) {
#endif
sds config = sdsempty();
char buf[CONFIG_MAX_LINE+1];

Expand All @@ -948,7 +967,11 @@ void loadServerConfig(char *filename, char *options) {
config = sdscat(config,"\n");
config = sdscat(config,options);
}
#ifdef _WIN32
loadServerConfigFromString(config,lineFilter);
#else
loadServerConfigFromString(config);
#endif
sdsfree(config);
}

Expand Down
4 changes: 4 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -4479,7 +4479,11 @@ int main(int argc, char **argv) {
exit(1);
}
resetServerSaveParams();
#ifdef _WIN32
loadServerConfig(configfile,options,NULL);
#else
loadServerConfig(configfile,options);
#endif
sdsfree(options);
}

Expand Down
8 changes: 7 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1845,7 +1845,13 @@ int keyspaceEventsStringToFlags(char *classes);
sds keyspaceEventsFlagsToString(int flags);

/* Configuration */
void loadServerConfig(char *filename, char *options);
#ifdef _WIN32
//callback to include/skip given config file parameter; TRUE = include, FALSE = skip
typedef int (*configLineFilter)(sds);
void loadServerConfig(char *filename, char *options, configLineFilter lineFilter);
#else
void loadServerConfig(char* filename, char* options);
#endif
void appendServerSaveParams(time_t seconds, int changes);
void resetServerSaveParams(void);
struct rewriteConfigState; /* Forward declaration to export API. */
Expand Down

0 comments on commit 223d9e1

Please sign in to comment.