Skip to content

Commit

Permalink
fix: Make replica's REPLCONF IP-ADDRESS optional (#3473)
Browse files Browse the repository at this point in the history
**Background**

In v1.21.0 we introduced support for `--announce_ip` for replicas to
announce their public IP addresses.

Like Valkey, this uses `REPLCONF IP-ADDRESS` to announce their IP
address.

**The issue**

Older Dragonfly releases (<1.21) did not support this feature. The
master side simply returned an error for such `REPLCONF` attempts,
however the replica code failed the replication, resulting in
incompatible versions.

**The fix**

The fix is simple, just log an error if the master did not respect
`REPLCONF IP-ADDRESS`. We can make this non-optional in the future
again.

However, in addition, I added a regression test to make sure we are
backwards compatible with v1.19.2. We'll bump this up every once in a
while.
  • Loading branch information
chakaz authored Aug 8, 2024
1 parent 7b9bbc1 commit b6b42c9
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 2 deletions.
3 changes: 2 additions & 1 deletion src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,8 @@ error_code Replica::Greet() {
auto announce_ip = absl::GetFlag(FLAGS_announce_ip);
if (!announce_ip.empty()) {
RETURN_ON_ERR(SendCommandAndReadResponse(StrCat("REPLCONF ip-address ", announce_ip)));
PC_RETURN_ON_BAD_RESPONSE(CheckRespIsSimpleReply("OK"));
LOG_IF(WARNING, !CheckRespIsSimpleReply("OK"))
<< "Master did not OK announced IP address, perhaps it is using an old version";
}

// Corresponds to server.repl_state == REPL_STATE_SEND_CAPA
Expand Down
6 changes: 5 additions & 1 deletion tests/dragonfly/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ def __init__(self, params: DflyParams, args):
self.params = params
self.instances = []

def create(self, existing_port=None, **kwargs) -> DflyInstance:
def create(self, existing_port=None, path=None, **kwargs) -> DflyInstance:
args = {**self.args, **kwargs}
args.setdefault("dbfilename", "")
args.setdefault("noversion_check", None)
Expand All @@ -352,6 +352,10 @@ def create(self, existing_port=None, **kwargs) -> DflyInstance:
params = dataclasses.replace(self.params, existing_port=existing_port)
else:
params = self.params

if path is not None:
params = dataclasses.replace(self.params, path=path)

instance = DflyInstance(params, args)
self.instances.append(instance)
return instance
Expand Down
67 changes: 67 additions & 0 deletions tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
import pytest
import asyncio
import async_timeout
import platform
import pymemcache
import logging
import tarfile
import urllib.request
from redis import asyncio as aioredis
from .utility import *
from .instance import DflyInstanceFactory, DflyInstance
Expand Down Expand Up @@ -2261,3 +2264,67 @@ async def check_replica_disconnected():
await asyncio.sleep(1) # wait for the master to recognize it's being blocked
await check_replica_disconnected()
df_factory.stop_all()


def download_dragonfly_release(version):
path = f"/tmp/{tmp_file_name()}"
os.mkdir(path)
gzfile = f"{path}/dragonfly.tar.gz"
logging.debug(f"Downloading Dragonfly release into {gzfile}...")

# Download
urllib.request.urlretrieve(
f"https://github.com/dragonflydb/dragonfly/releases/download/{version}/dragonfly-x86_64.tar.gz",
gzfile,
)

# Extract
file = tarfile.open(gzfile)
file.extractall(path)
file.close()

# Return path
return f"{path}/dragonfly-x86_64"


@pytest.mark.parametrize(
"cluster_mode, announce_ip, announce_port",
[
("", "localhost", 7000),
("emulated", "", 0),
("emulated", "localhost", 7000),
],
)
async def test_replicate_old_master(
df_factory: DflyInstanceFactory, cluster_mode, announce_ip, announce_port
):
cpu = platform.processor()
if cpu != "x86_64":
pytest.skip(f"Supported only on x64, running on {cpu}")

dfly_version = "v1.19.2"
released_dfly_path = download_dragonfly_release(dfly_version)
master = df_factory.create(path=released_dfly_path, cluster_mode=cluster_mode)
replica = df_factory.create(
cluster_mode=cluster_mode, announce_ip=announce_ip, announce_port=announce_port
)

df_factory.start_all([master, replica])

c_master = master.client()
c_replica = replica.client()

assert (
f"df-{dfly_version}"
== (await c_master.execute_command("info", "server"))["dragonfly_version"]
)
assert "df-dev" == (await c_replica.execute_command("info", "server"))["dragonfly_version"]

await c_master.execute_command("set", "k1", "v1")

assert await c_replica.execute_command(f"REPLICAOF localhost {master.port}") == "OK"
await wait_available_async(c_replica)

assert await c_replica.execute_command("get", "k1") == "v1"

await disconnect_clients(c_master, c_replica)

0 comments on commit b6b42c9

Please sign in to comment.