-
Notifications
You must be signed in to change notification settings - Fork 95
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Better cleaning of allocated resources #7280
base: staging
Are you sure you want to change the base?
Conversation
- Added invocation of ModelClassLoader.classLoader.close inside ModelData.close - Added invocation of StreamExecutionEnvironment.close inside FlinkTestMain and FlinkVerificationMain and FlinkMiniClusterTableOperations.parseTestRecords - Added debug logs for easier further investigations of memory leaks
📝 Walkthrough📝 WalkthroughWalkthroughThe pull request introduces several updates across multiple files, primarily focusing on enhancing resource management and logging capabilities. Key changes include the addition of new entries in the Possibly related PRs
Suggested labels
Suggested reviewers
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (3)
engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkVerificationMain.scala (1)
44-58
: Consider using Scala's Using utility for cleaner resource managementThe current nested try-finally blocks could be simplified using Scala's Using utility, which would make the code more concise and maintainable.
Here's a suggested refactor:
def runTest(): Unit = { - val collectingListener = ResultsCollectingListenerHolder.registerTestEngineListener - try { - val resultCollector = new TestServiceInvocationCollector(collectingListener) - val registrar = prepareRegistrar() - val env = createEnv - - try { - registrar.register(env, process, processVersion, deploymentData, resultCollector) - execute(env, SavepointRestoreSettings.forPath(savepointPath, true)) - } finally { - logger.debug(s"Closing LocalEnvironment for model with classpath: ${modelData.modelClassLoader}") - env.close() - } - } finally { - collectingListener.close() - } + Using.Manager { use => + val collectingListener = use(ResultsCollectingListenerHolder.registerTestEngineListener) + val resultCollector = new TestServiceInvocationCollector(collectingListener) + val registrar = prepareRegistrar() + val env = use(createEnv) + + registrar.register(env, process, processVersion, deploymentData, resultCollector) + execute(env, SavepointRestoreSettings.forPath(savepointPath, true)) + + logger.debug(s"Successfully closed LocalEnvironment for model with classpath: ${modelData.modelClassLoader}") + }.get // or better error handling }This approach:
- Eliminates nested try-finally blocks
- Automatically handles resource cleanup
- Makes the code more idiomatic Scala
- Improves readability and maintainability
engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/source/FlinkMiniClusterTableOperations.scala (2)
28-37
: Consider enhancing error handling in the parseTestRecords method.While the resource management is improved with
Using.resource
, there are a few areas that could be strengthened:
- The
writeRecordsToFile
call could throw an exception that might leave the table environment in an inconsistent state- The cleanup method silently handles errors, which might mask important issues
Consider wrapping the core logic in a Try block and propagating errors:
Using.resource(MiniClusterEnvBuilder.createLocalStreamEnv) { streamEnv => implicit val tableEvn: StreamTableEnvironment = MiniClusterEnvBuilder.createTableStreamEnv(streamEnv) val (inputTablePath, inputTableName) = createTempFileTable(schema) try { - writeRecordsToFile(inputTablePath, records) - val inputTable = tableEvn.from(s"`$inputTableName`") - tableEvn.toDataStream(inputTable).executeAndCollect().asScala.toList + Try { + writeRecordsToFile(inputTablePath, records) + val inputTable = tableEvn.from(s"`$inputTableName`") + tableEvn.toDataStream(inputTable).executeAndCollect().asScala.toList + }.recoverWith { case e => + logger.error("Failed to parse test records", e) + Failure(e) + }.get } finally { cleanup(inputTablePath) } }
189-189
: Consider adding explicit cleanup for TableEnvironment.The
createTableEnv
method creates a new TableEnvironment instance. While TableEnvironment implements AutoCloseable, it's good practice to document this requirement.Add a comment to document the cleanup responsibility:
+ // Returns a TableEnvironment that must be closed by the caller def createTableEnv: TableEnvironment = TableEnvironment.create(tableEnvConfig)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (7)
docs/Changelog.md
(1 hunks)engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/source/FlinkMiniClusterTableOperations.scala
(3 hunks)engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkStubbedRunner.scala
(2 hunks)engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala
(3 hunks)engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkVerificationMain.scala
(3 hunks)scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/ModelData.scala
(1 hunks)utils/utils-internal/src/main/scala/pl/touk/nussknacker/engine/util/loader/ModelClassLoader.scala
(3 hunks)
✅ Files skipped from review due to trivial changes (1)
- docs/Changelog.md
🔇 Additional comments (10)
engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkVerificationMain.scala (2)
3-3
: LGTM: Good addition of logging capabilities
The extension of LazyLogging
trait and related imports enhance the class's debugging capabilities, which aligns well with the PR's objective of improving resource management traceability.
Also applies to: 15-15, 39-40
44-58
: LGTM: Improved resource management with proper cleanup
The nested try-finally blocks ensure proper cleanup of both the environment and collecting listener, with added debug logging for better traceability.
engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkStubbedRunner.scala (3)
25-25
: LGTM! Good practice for logging integration
The addition of LazyLogging using self-type annotation is a clean way to mix in logging capabilities.
34-34
: LGTM! Helpful debug logging
The addition of debug logging for the model classpath will aid in troubleshooting classpath-related issues.
33-42
: 🛠️ Refactor suggestion
Consider adding StreamExecutionEnvironment cleanup
Given the PR's focus on resource cleanup, consider ensuring the StreamExecutionEnvironment is properly closed when no longer needed. This aligns with the changes made in other files like FlinkTestMain and FlinkVerificationMain.
Consider wrapping the environment creation and usage in a try-with-resources block or ensuring it's closed in the calling code:
protected def createEnv: StreamExecutionEnvironment = {
logger.debug(s"Creating LocalEnvironment for model with classpath: ${modelData.modelClassLoader}")
- StreamExecutionEnvironment.createLocalEnvironment(
+ val env = StreamExecutionEnvironment.createLocalEnvironment(
MetaDataExtractor
.extractTypeSpecificDataOrDefault[StreamMetaData](process.metaData, StreamMetaData())
.parallelism
.getOrElse(1),
configuration
)
+ // Register a shutdown hook to ensure cleanup
+ Runtime.getRuntime.addShutdownHook(new Thread(() => env.close()))
+ env
}
Let's verify if the environment is properly closed in the calling code:
engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala (2)
3-3
: LGTM: LazyLogging trait addition
The addition of LazyLogging trait is well-placed and aligns with the PR's objective of improving debugging capabilities for resource management.
Also applies to: 57-58
67-74
: Verify resource cleanup ordering and completeness
The addition of the nested try-finally block ensures proper cleanup of the Flink environment. The debug logging will help track resource lifecycle.
Consider documenting the resource cleanup order in comments to make the cleanup strategy more explicit.
Let's verify if there are other resources that might need similar cleanup:
✅ Verification successful
Resource cleanup is properly ordered and complete
The nested try-finally blocks ensure proper cleanup order:
- Inner try-finally: Handles environment cleanup with
env.close()
after registration/execution - Outer try-finally: Handles any other cleanup (e.g., collecting listener results)
This structure guarantees environment cleanup even if registration or execution fails, while maintaining the existing cleanup in the outer block. No additional resource cleanup is needed.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for resource allocation patterns that might need cleanup
rg -t scala "new|create.*Env|\.open" --no-filename -A 5 | grep -v "\.close"
# Look for existing cleanup patterns to ensure consistency
ast-grep --pattern 'try {
$$$
} finally {
$$$
}'
Length of output: 114884
Script:
#!/bin/bash
# Let's try a more focused search for resource cleanup patterns in the specific file
rg -t scala "try|finally|close|env" engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala
Length of output: 394
utils/utils-internal/src/main/scala/pl/touk/nussknacker/engine/util/loader/ModelClassLoader.scala (2)
5-5
: LGTM: Appropriate trait and import additions
The addition of LazyLogging
trait and Closeable
import supports the new resource management functionality.
Also applies to: 9-9
80-85
: LGTM: Improved logging and readability
The addition of debug logging and intermediate variable assignment enhances traceability and code clarity.
engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/source/FlinkMiniClusterTableOperations.scala (1)
191-195
: Verify StreamExecutionEnvironment cleanup in createLocalStreamEnv.
The createLocalStreamEnv
method creates a new environment but doesn't ensure proper cleanup. This relates to the TODO comment about memory leaks in the minicluster.
Let's verify if there are other instances where StreamExecutionEnvironment might not be properly closed:
Consider implementing a custom AutoCloseable wrapper for StreamExecutionEnvironment to ensure proper cleanup:
class ManagedStreamExecutionEnvironment private (val env: StreamExecutionEnvironment) extends AutoCloseable {
override def close(): Unit = env.close()
}
object ManagedStreamExecutionEnvironment {
def create(config: Configuration): ManagedStreamExecutionEnvironment =
new ManagedStreamExecutionEnvironment(StreamExecutionEnvironment.createLocalEnvironment(config))
}
def close(): Unit = { | ||
classLoader match { | ||
case closeable: Closeable => | ||
logger.debug(s"$toString: Closing Closeable classloader") | ||
closeable.close() | ||
case _ => | ||
logger.debug(s"$toString: Classloader ${classLoader.getClass.getName} is not Closeable, skipping close") | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add exception handling to close() method
While the implementation is good, it should handle potential exceptions during close operation to prevent them from propagating up the call stack.
Consider this safer implementation:
def close(): Unit = {
classLoader match {
case closeable: Closeable =>
logger.debug(s"$toString: Closing Closeable classloader")
- closeable.close()
+ try {
+ closeable.close()
+ } catch {
+ case e: Exception =>
+ logger.warn(s"$toString: Failed to close classloader", e)
+ }
case _ =>
logger.debug(s"$toString: Classloader ${classLoader.getClass.getName} is not Closeable, skipping close")
}
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def close(): Unit = { | |
classLoader match { | |
case closeable: Closeable => | |
logger.debug(s"$toString: Closing Closeable classloader") | |
closeable.close() | |
case _ => | |
logger.debug(s"$toString: Classloader ${classLoader.getClass.getName} is not Closeable, skipping close") | |
} | |
} | |
def close(): Unit = { | |
classLoader match { | |
case closeable: Closeable => | |
logger.debug(s"$toString: Closing Closeable classloader") | |
try { | |
closeable.close() | |
} catch { | |
case e: Exception => | |
logger.warn(s"$toString: Failed to close classloader", e) | |
} | |
case _ => | |
logger.debug(s"$toString: Classloader ${classLoader.getClass.getName} is not Closeable, skipping close") | |
} | |
} |
designerDictServices.close() | ||
modelClassLoader.close() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Enhance exception handling in close() method
The close implementation should handle exceptions from both designerDictServices.close()
and modelClassLoader.close()
to ensure both resources are attempted to be closed even if one fails.
Consider this more robust implementation:
final def close(): Unit = {
- designerDictServices.close()
- modelClassLoader.close()
+ var exceptions: List[Throwable] = Nil
+ try {
+ designerDictServices.close()
+ } catch {
+ case e: Exception =>
+ logger.warn("Failed to close designerDictServices", e)
+ exceptions = e :: exceptions
+ }
+ try {
+ modelClassLoader.close()
+ } catch {
+ case e: Exception =>
+ logger.warn("Failed to close modelClassLoader", e)
+ exceptions = e :: exceptions
+ }
+ if (exceptions.nonEmpty) {
+ throw new Exception("Failed to close one or more resources", exceptions.head)
+ }
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
designerDictServices.close() | |
modelClassLoader.close() | |
} | |
final def close(): Unit = { | |
var exceptions: List[Throwable] = Nil | |
try { | |
designerDictServices.close() | |
} catch { | |
case e: Exception => | |
logger.warn("Failed to close designerDictServices", e) | |
exceptions = e :: exceptions | |
} | |
try { | |
modelClassLoader.close() | |
} catch { | |
case e: Exception => | |
logger.warn("Failed to close modelClassLoader", e) | |
exceptions = e :: exceptions | |
} | |
if (exceptions.nonEmpty) { | |
throw new Exception("Failed to close one or more resources", exceptions.head) | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (1)
scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/testing/LocalModelData.scala (1)
121-123
: Consider enhancing the documentation using ScalaDoc format.The implementation correctly prevents closing of the AppClassLoader, which aligns with proper resource management. However, the documentation could be more IDE-friendly and detailed.
Consider updating the documentation to:
- // For LocalModelData we can't close classloader because it is used not a dedicated classloader in this case - // but AppClassLoader + /** + * Indicates whether the model's ClassLoader should be closed during cleanup. + * + * @return false - We don't close the ClassLoader in LocalModelData because it uses the system's + * AppClassLoader rather than a dedicated ClassLoader. Closing AppClassLoader would be unsafe + * as it's shared across the JVM. + */ override protected def shouldCloseClassLoader: Boolean = false
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/ModelData.scala
(1 hunks)scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/testing/LocalModelData.scala
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/ModelData.scala
🔇 Additional comments (1)
scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/testing/LocalModelData.scala (1)
121-124
: Verify the integration with ModelData.close()
Let's ensure this change properly integrates with the parent class's resource management.
✅ Verification successful
Change is properly integrated with ModelData.close()
The verification shows that:
- The parent
ModelData
class has a proper close() implementation that checksshouldCloseClassLoader
before closing LocalModelData
correctly overridesshouldCloseClassLoader
to return false- The classloader is only closed in
ModelData.close()
whenshouldCloseClassLoader
is true - No other unexpected usages of
modelClassLoader.close()
were found
The change is safe and properly integrated with the parent class's resource management.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify the integration with ModelData's close() method and find any assumptions about classloader closure
# Check the parent ModelData class implementation
echo "Checking ModelData implementation:"
rg -A 10 "class ModelData"
# Look for any direct calls to modelClassLoader.close
echo "Checking for direct classloader close calls:"
rg "modelClassLoader.close"
# Look for any tests that might be making assumptions about classloader closure
echo "Checking test assumptions:"
rg -t scala "shouldCloseClassLoader|modelClassLoader.close" ".*Test.*scala"
Length of output: 3767
Script:
#!/bin/bash
# Let's check the ModelData class implementation more specifically
echo "Checking ModelData class and close() implementation:"
rg -A 20 "abstract class ModelData"
# Check LocalModelData's parent class references
echo "Checking LocalModelData class hierarchy:"
rg -A 10 "class LocalModelData"
# Look for any other implementations of shouldCloseClassLoader
echo "Checking other shouldCloseClassLoader implementations:"
ast-grep --pattern 'override protected def shouldCloseClassLoader'
Length of output: 1997
Script:
#!/bin/bash
# Let's check the base ModelData class implementation and close method
echo "Checking ModelData.scala content:"
rg -l "class ModelData" | xargs cat
# Check for any usages of close() method on ModelData
echo "Checking ModelData.close() usages:"
rg "\.close\(\)" -B 3 -A 1
# Look for any test files related to ModelData closure
echo "Checking test files for ModelData closure:"
fd -e scala -e java | rg -l "ModelData.*Test|Test.*ModelData"
Length of output: 125504
Script:
#!/bin/bash
# Let's check the ModelData class implementation and close() method specifically
echo "Checking ModelData.scala for close() implementation:"
rg -A 10 "def close\(\)" scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/ModelData.scala
# Check for any other implementations of shouldCloseClassLoader
echo "Checking for shouldCloseClassLoader implementations:"
rg -A 5 "def shouldCloseClassLoader"
# Check for any usages of modelClassLoader.close()
echo "Checking modelClassLoader.close() usages:"
rg "modelClassLoader\.close\(\)"
Length of output: 1427
Describe your changes
Checklist before merge
Summary by CodeRabbit
Release Notes for Version 1.19.0
New Features
Bug Fixes
Enhancements
ModelData
.Chores