Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination S3-V2: Bug: Honor path variables in bucket prefix #51039

Merged
merged 1 commit into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ interface PathFactory {
fun getPathMatcher(stream: DestinationStream, suffixPattern: String? = null): PathMatcher

val supportsStaging: Boolean
val prefix: String
val finalPrefix: String
}

data class PathMatcher(val regex: Regex, val variableToIndex: Map<String, Int>) {
Expand All @@ -63,23 +63,14 @@ class ObjectStoragePathFactory(
compressionConfigProvider: ObjectStorageCompressionConfigurationProvider<*>? = null,
private val timeProvider: TimeProvider,
) : PathFactory {
// Resolved configuration
private val pathConfig = pathConfigProvider.objectStoragePathConfiguration
override val supportsStaging: Boolean = pathConfig.usesStagingDirectory

// Resolved bucket path prefixes
private val stagingPrefixResolved =
pathConfig.stagingPrefix
?: Paths.get(pathConfig.prefix, DEFAULT_STAGING_PREFIX_SUFFIX).toString()
private val pathPatternResolved = pathConfig.pathSuffixPattern ?: DEFAULT_PATH_FORMAT
private val filePatternResolved = pathConfig.fileNamePattern ?: DEFAULT_FILE_FORMAT
private val fileFormatExtension =
formatConfigProvider?.objectStorageFormatConfiguration?.extension
private val compressionExtension =
compressionConfigProvider?.objectStorageCompressionConfiguration?.compressor?.extension
private val defaultExtension =
if (fileFormatExtension != null && compressionExtension != null) {
"$fileFormatExtension.$compressionExtension"
} else {
fileFormatExtension ?: compressionExtension
}

private val stagingPrefix: String
get() =
if (!pathConfig.usesStagingDirectory) {
Expand All @@ -89,15 +80,29 @@ class ObjectStoragePathFactory(
} else {
stagingPrefixResolved
}

override val supportsStaging: Boolean = pathConfig.usesStagingDirectory
override val prefix: String =
override val finalPrefix: String =
if (pathConfig.prefix.endsWith('/')) {
pathConfig.prefix.take(pathConfig.prefix.length - 1)
} else {
pathConfig.prefix
}

// Resolved path and filename patterns
private val pathPatternResolved = pathConfig.pathSuffixPattern ?: DEFAULT_PATH_FORMAT
private val filePatternResolved = pathConfig.fileNamePattern ?: DEFAULT_FILE_FORMAT

// Resolved file extensions
private val fileFormatExtension =
formatConfigProvider?.objectStorageFormatConfiguration?.extension
private val compressionExtension =
compressionConfigProvider?.objectStorageCompressionConfiguration?.compressor?.extension
private val defaultExtension =
if (fileFormatExtension != null && compressionExtension != null) {
"$fileFormatExtension.$compressionExtension"
} else {
fileFormatExtension ?: compressionExtension
}

/**
* Variable substitution is complex.
*
Expand Down Expand Up @@ -252,10 +257,17 @@ class ObjectStoragePathFactory(
}
}

/**
* This is to maintain parity with legacy code. Whether the path pattern ends with "/" is
* significant.
*
* * path: "{STREAM_NAME}/foo/" + "{part_number}{format_extension}" => "my_stream/foo/1.json"
* * path: "{STREAM_NAME}/foo" + "{part_number}{format_extension}" => "my_stream/foo1.json"
*/
private fun resolveRetainingTerminalSlash(prefix: String, path: String): String {
val asPath = Paths.get(prefix, path)
return if (path.endsWith('/')) {
asPath.toString() + "/"
"$asPath/"
} else {
asPath.toString()
}
Expand All @@ -265,26 +277,24 @@ class ObjectStoragePathFactory(
stream: DestinationStream,
substituteStreamAndNamespaceOnly: Boolean
): String {
val path =
getFormattedPath(
stream,
if (substituteStreamAndNamespaceOnly) PATH_VARIABLES_STREAM_CONSTANT
else PATH_VARIABLES
)
return resolveRetainingTerminalSlash(stagingPrefix, path)
return getFormattedPath(
stream,
if (substituteStreamAndNamespaceOnly) PATH_VARIABLES_STREAM_CONSTANT
else PATH_VARIABLES,
isStaging = true
)
}

override fun getFinalDirectory(
stream: DestinationStream,
substituteStreamAndNamespaceOnly: Boolean
): String {
val path =
getFormattedPath(
stream,
if (substituteStreamAndNamespaceOnly) PATH_VARIABLES_STREAM_CONSTANT
else PATH_VARIABLES
)
return resolveRetainingTerminalSlash(prefix, path)
return getFormattedPath(
stream,
if (substituteStreamAndNamespaceOnly) PATH_VARIABLES_STREAM_CONSTANT
else PATH_VARIABLES,
isStaging = false
)
}

override fun getLongestStreamConstantPrefix(
Expand Down Expand Up @@ -323,9 +333,11 @@ class ObjectStoragePathFactory(

private fun getFormattedPath(
stream: DestinationStream,
variables: List<PathVariable> = PATH_VARIABLES
variables: List<PathVariable> = PATH_VARIABLES,
isStaging: Boolean
): String {
val pattern = pathPatternResolved
val selectedPrefix = if (isStaging) stagingPrefix else finalPrefix
val pattern = resolveRetainingTerminalSlash(selectedPrefix, pathPatternResolved)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be simpler to pass this pattern in instead of passing down a boolean to replicate the branching?

val context = VariableContext(stream)
return variables.fold(pattern) { acc, variable -> variable.maybeApply(acc, context) }
}
Expand Down Expand Up @@ -375,13 +387,10 @@ class ObjectStoragePathFactory(
val pathVariableToPattern = getPathVariableToPattern(stream)
val variableToIndex = mutableMapOf<String, Int>()

val pathPattern = resolveRetainingTerminalSlash(finalPrefix, pathPatternResolved)

val replacedForPath =
buildPattern(
pathPatternResolved,
"""\\\$\{(\w+)}""",
pathVariableToPattern,
variableToIndex
)
buildPattern(pathPattern, """\\\$\{(\w+)}""", pathVariableToPattern, variableToIndex)
val replacedForFile =
buildPattern(
filePatternResolved,
Expand All @@ -391,12 +400,9 @@ class ObjectStoragePathFactory(
)
// NOTE the old code does not actually resolve the path + filename,
// even tho the documentation says it does.
val combined =
if (replacedForPath.startsWith('/')) {
"${prefix}$replacedForPath$replacedForFile"
} else {
"$prefix/$replacedForPath$replacedForFile"
}
val replacedForPathWithEmptyVariablesRemoved =
resolveRetainingTerminalSlash("", replacedForPath)
val combined = "$replacedForPathWithEmptyVariablesRemoved$replacedForFile"
val withSuffix =
if (suffixPattern != null) {
variableToIndex["suffix"] = variableToIndex.size + 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,41 @@ class ObjectStoragePathFactoryUTest {
assertNotNull(match2)
assertEquals(match2?.customSuffix, "-1")
}

@Test
fun `test file pattern with variable in prefix`() {
every { pathConfigProvider.objectStoragePathConfiguration } returns
ObjectStoragePathConfiguration(
"prefix-\${NAMESPACE}",
"staging-\${NAMESPACE}",
"\${STREAM_NAME}/",
"any_filename",
true,
)
val factory = ObjectStoragePathFactory(pathConfigProvider, null, null, timeProvider)
assertEquals(
"prefix-test/stream/any_filename",
factory.getPathToFile(stream, 0L, isStaging = false)
)
assertEquals(
"staging-test/stream/any_filename",
factory.getPathToFile(stream, 0L, isStaging = true)
)
}

@Test
fun `test pattern matcher with variable in prefix`() {
every { pathConfigProvider.objectStoragePathConfiguration } returns
ObjectStoragePathConfiguration(
"prefix-\${NAMESPACE}",
"staging-\${NAMESPACE}",
"\${STREAM_NAME}/",
"any_filename",
true,
)
val factory = ObjectStoragePathFactory(pathConfigProvider, null, null, timeProvider)
val matcher = factory.getPathMatcher(stream, "(-foo)?")
assertNotNull(matcher.match("prefix-test/stream/any_filename"))
assertNotNull(matcher.match("prefix-test/stream/any_filename-foo"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ class ObjectStorageDestinationStateTest {
): List<Triple<Int, String, Long>> {
val genIdKey = ObjectStorageDestinationState.METADATA_GENERATION_ID_KEY
val prefix =
"${d.pathFactory.prefix}/${stream.descriptor.namespace}/${stream.descriptor.name}"
"${d.pathFactory.finalPrefix}/${stream.descriptor.namespace}/${stream.descriptor.name}"
val generations =
listOf(
Triple(0, "$prefix/key1-0", 0L),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ open class MockPathFactory : PathFactory {

override val supportsStaging: Boolean
get() = doSupportStaging
override val prefix: String
override val finalPrefix: String
get() = "prefix"

private fun fromStream(stream: DestinationStream): String {
Expand All @@ -28,14 +28,14 @@ open class MockPathFactory : PathFactory {
stream: DestinationStream,
substituteStreamAndNamespaceOnly: Boolean
): String {
return "$prefix/staging/${fromStream(stream)}"
return "$finalPrefix/staging/${fromStream(stream)}"
}

override fun getFinalDirectory(
stream: DestinationStream,
substituteStreamAndNamespaceOnly: Boolean
): String {
return "$prefix/${fromStream(stream)}"
return "$finalPrefix/${fromStream(stream)}"
}

override fun getPathToFile(
Expand Down Expand Up @@ -66,7 +66,7 @@ open class MockPathFactory : PathFactory {
return PathMatcher(
regex =
Regex(
"$prefix/(${stream.descriptor.namespace})/(${stream.descriptor.name})/(.*)-(.*)$"
"$finalPrefix/(${stream.descriptor.namespace})/(${stream.descriptor.name})/(.*)-(.*)$"
),
variableToIndex = mapOf("part_number" to 4)
)
Expand Down
Loading