Skip to content

Commit

Permalink
Merge pull request #199 from JetBrains-Research/JBAI-4393-memory-control
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitriyb authored Sep 2, 2024
2 parents 613c5a7 + 61011f8 commit 0d7094f
Show file tree
Hide file tree
Showing 49 changed files with 862 additions and 604 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ fun KotlinJvmTarget.configureBenchmarkTests() {
group = "verification"

maxHeapSize = "4G"
systemProperty("kotlinx.coroutines.debug", "off")

useJUnitPlatform()

Expand Down
2 changes: 2 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ okio = "3.6.0"
onnxruntime = "1.17.0.patched-1"
slf4j = "2.0.9"
wire = "4.9.3"
fastutil = "8.5.14"

# JS Dependencies
loglevel = "1.8.1"
Expand All @@ -36,3 +37,4 @@ onnxruntime-gpu = { module = "com.microsoft.onnxruntime:onnxruntime_gpu", versio
slf4j-api = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" }
slf4j-simple = { module = "org.slf4j:slf4j-simple", version.ref = "slf4j" }
wire-runtime = { module = "com.squareup.wire:wire-runtime", version.ref = "wire" }
fastutil-core = { module = "it.unimi.dsi:fastutil-core", version.ref = "fastutil" }
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,19 @@ import io.kinference.core.optimizer.rules.OptimizerRuleSet
import io.kinference.data.ONNXData
import io.kinference.data.ONNXDataType
import io.kinference.model.IrOptimizableEngine
import io.kinference.ndarray.arrays.memory.MemoryLimiter
import io.kinference.ndarray.arrays.memory.MemoryLimiters
import io.kinference.optimizer.GraphOptimizer
import io.kinference.optimizer.OptimizerRule
import io.kinference.protobuf.*
import io.kinference.protobuf.message.*
import io.kinference.utils.CommonDataLoader
import io.kinference.utils.PlatformUtils
import io.kinference.utils.PredictionConfig
import io.kinference.utils.PredictionConfigs
import okio.Buffer
import okio.Path
import okio.Path.Companion.toPath

typealias KIONNXData<T> = ONNXData<T, CoreBackend>

// Define an interface for allocation control marking output
internal interface KIONNXDataArraysReleaser {
fun markOutput()
}

internal fun <T> KIONNXData<T>.markOutput() {
if (this is KIONNXDataArraysReleaser)
this.markOutput()
}

object CoreBackend : BackendInfo(name = "KInference Core CPU Backend")

/**
Expand All @@ -51,37 +40,37 @@ object KIEngine : IrOptimizableEngine<KIONNXData<*>> {

fun protoReader(bytes: ByteArray) = ProtobufReader(Buffer().write(bytes), KI_READER_CONFIG)

suspend fun loadModel(bytes: ByteArray, optimize: Boolean, memoryLimiter: MemoryLimiter, parallelismLimit: Int): KIModel {
suspend fun loadModel(bytes: ByteArray, optimize: Boolean, predictionConfig: PredictionConfig): KIModel {
val rules = if (optimize) OptimizerRuleSet.DEFAULT_OPT_RULES else emptyList()
return loadModel(bytes, rules, memoryLimiter, parallelismLimit)
return loadModel(bytes, rules, predictionConfig)
}

override suspend fun loadModel(bytes: ByteArray, optimize: Boolean): KIModel {
return loadModel(bytes, optimize, MemoryLimiters.NoAllocator, PlatformUtils.cores)
return loadModel(bytes, optimize, PredictionConfigs.NoAllocator)
}

override suspend fun loadModel(bytes: ByteArray, rules: List<OptimizerRule<KIONNXData<*>>>): KIModel = loadModel(bytes, rules, MemoryLimiters.NoAllocator, PlatformUtils.cores)
override suspend fun loadModel(bytes: ByteArray, rules: List<OptimizerRule<KIONNXData<*>>>): KIModel = loadModel(bytes, rules, PredictionConfigs.NoAllocator)

suspend fun loadModel(bytes: ByteArray, rules: List<OptimizerRule<KIONNXData<*>>>, memoryLimiter: MemoryLimiter, parallelismLimit: Int): KIModel {
suspend fun loadModel(bytes: ByteArray, rules: List<OptimizerRule<KIONNXData<*>>>, predictionConfig: PredictionConfig): KIModel {
val modelScheme = ModelProto.decode(protoReader(bytes))
val model = KIModel(modelScheme, memoryLimiter)
val model = KIModel(modelScheme, predictionConfig)

return if (rules.isNotEmpty()) {
val newGraph = GraphOptimizer(model.graph).run(rules) as KIGraph
KIModel(model.id, model.name, model.opSet, newGraph, memoryLimiter, parallelismLimit)
KIModel(model.id, model.name, model.opSet, newGraph, predictionConfig)
} else {
model
}
}

override suspend fun loadModel(bytes: ByteArray): KIModel = loadModel(bytes, optimize = true)

suspend fun loadModel(path: Path, optimize: Boolean, memoryLimiter: MemoryLimiter, parallelismLimit: Int): KIModel {
return loadModel(CommonDataLoader.bytes(path), optimize, memoryLimiter, parallelismLimit)
suspend fun loadModel(path: Path, optimize: Boolean, predictionConfig: PredictionConfig): KIModel {
return loadModel(CommonDataLoader.bytes(path), optimize, predictionConfig)
}

override suspend fun loadModel(path: Path, optimize: Boolean): KIModel {
return loadModel(path, optimize, MemoryLimiters.NoAllocator, PlatformUtils.cores)
return loadModel(path, optimize, PredictionConfigs.NoAllocator)
}

override suspend fun loadModel(path: Path): KIModel = loadModel(path, optimize = true)
Expand All @@ -90,12 +79,12 @@ object KIEngine : IrOptimizableEngine<KIONNXData<*>> {
return loadModel(CommonDataLoader.bytes(path), rules)
}

suspend fun loadModel(path: String, optimize: Boolean, memoryLimiter: MemoryLimiter, parallelismLimit: Int): KIModel {
return loadModel(CommonDataLoader.bytes(path.toPath()), optimize, memoryLimiter, parallelismLimit)
suspend fun loadModel(path: String, optimize: Boolean, predictionConfig: PredictionConfig): KIModel {
return loadModel(CommonDataLoader.bytes(path.toPath()), optimize, predictionConfig)
}

override suspend fun loadModel(path: String, optimize: Boolean): KIModel {
return loadModel(path, optimize, MemoryLimiters.NoAllocator, PlatformUtils.cores)
return loadModel(path, optimize, PredictionConfigs.NoAllocator)
}

override suspend fun loadModel(path: String): KIModel = loadModel(path, optimize = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import io.kinference.protobuf.message.TensorProto
import io.kinference.types.ValueInfo
import io.kinference.types.ValueTypeInfo

class KIONNXMap(name: String?, data: Map<Any, KIONNXData<*>>, val info: ValueTypeInfo.MapTypeInfo) : ONNXMap<Map<Any, KIONNXData<*>>, CoreBackend>(name, data), KIONNXDataArraysReleaser {
class KIONNXMap(name: String?, data: Map<Any, KIONNXData<*>>, val info: ValueTypeInfo.MapTypeInfo) : ONNXMap<Map<Any, KIONNXData<*>>, CoreBackend>(name, data) {
constructor(data: Map<Any, KIONNXData<*>>, info: ValueInfo) : this(info.name, data, info.typeInfo as ValueTypeInfo.MapTypeInfo)

override val backend = CoreBackend
Expand All @@ -26,10 +26,6 @@ class KIONNXMap(name: String?, data: Map<Any, KIONNXData<*>>, val info: ValueTyp

override fun rename(name: String): KIONNXMap = KIONNXMap(name, data, info)

override fun markOutput() {
data.values.forEach { it.markOutput() }
}

override suspend fun clone(newName: String?): KIONNXMap {
val newMap = HashMap<Any, KIONNXData<*>>(data.size)
for ((key, value) in data.entries) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import io.kinference.data.ONNXSequence
import io.kinference.protobuf.message.SequenceProto
import io.kinference.types.*

class KIONNXSequence(name: String?, data: List<KIONNXData<*>>, val info: ValueTypeInfo.SequenceTypeInfo) : ONNXSequence<List<KIONNXData<*>>, CoreBackend>(name, data), KIONNXDataArraysReleaser {
class KIONNXSequence(name: String?, data: List<KIONNXData<*>>, val info: ValueTypeInfo.SequenceTypeInfo) : ONNXSequence<List<KIONNXData<*>>, CoreBackend>(name, data) {
constructor(name: String?, info: ValueTypeInfo.SequenceTypeInfo, size: Int, init: (Int) -> KIONNXData<*>) : this(name, List(size, init), info)
constructor(data: List<KIONNXData<*>>, info: ValueInfo) : this(info.name, data, info.typeInfo as ValueTypeInfo.SequenceTypeInfo)

Expand All @@ -23,10 +23,6 @@ class KIONNXSequence(name: String?, data: List<KIONNXData<*>>, val info: ValueTy

override fun rename(name: String): KIONNXSequence = KIONNXSequence(name, data, info)

override fun markOutput() {
data.forEach { it.markOutput() }
}

val length: Int = data.size

companion object {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package io.kinference.core.data.tensor

import io.kinference.core.CoreBackend
import io.kinference.core.KIONNXDataArraysReleaser
import io.kinference.core.*
import io.kinference.data.ONNXTensor
import io.kinference.ndarray.arrays.*
import io.kinference.ndarray.arrays.memory.contexts.ManualAllocatorContext
import io.kinference.ndarray.arrays.tiled.*
import io.kinference.protobuf.FLOAT_TENSOR_TYPES
import io.kinference.protobuf.message.TensorProto
Expand All @@ -13,22 +13,18 @@ import io.kinference.types.ValueTypeInfo

//TODO: support segments
//TODO: support external data
class KITensor(name: String?, override val data: NDArrayCore, val info: ValueTypeInfo.TensorTypeInfo) : ONNXTensor<NDArrayCore, CoreBackend>(name, data), KIONNXDataArraysReleaser {
class KITensor(name: String?, override val data: NDArrayCore, val info: ValueTypeInfo.TensorTypeInfo, private var context: ManualAllocatorContext? = null) : ONNXTensor<NDArrayCore, CoreBackend>(name, data) {
constructor(data: NDArrayCore, info: ValueInfo) : this(info.name, data, info.typeInfo as ValueTypeInfo.TensorTypeInfo)

override suspend fun close() {
context?.returnNDArray(data)
data.close()
}

override suspend fun clone(newName: String?): KITensor {
return KITensor(newName, data.clone(), info)
}

override fun markOutput() {
if (this.data is MemoryControlledArray)
data.markOutput()
}

suspend operator fun minus(other: KITensor): KITensor {
require(this.data is NumberNDArrayCore && other.data is NumberNDArrayCore)
return (this.data - other.data).asTensor()
Expand All @@ -47,7 +43,7 @@ class KITensor(name: String?, override val data: NDArrayCore, val info: ValueTyp
override val backend = CoreBackend

override fun rename(name: String): KITensor {
return KITensor(name, data, info)
return KITensor(name, data, info, context)
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package io.kinference.core.data.tensor

import io.kinference.ndarray.arrays.*
import io.kinference.ndarray.arrays.memory.contexts.ManualAllocatorContext
import io.kinference.ndarray.extensions.concat
import io.kinference.ndarray.extensions.splitWithAxis
import io.kinference.primitives.types.DataType
import io.kinference.protobuf.resolveProtoDataType
import io.kinference.types.TensorShape
import io.kinference.types.ValueTypeInfo

fun NDArrayCore.asTensor(name: String? = null) = KITensor(name, this, ValueTypeInfo.TensorTypeInfo(TensorShape(this.shape), type.resolveProtoDataType()))
fun NDArrayCore.asTensor(name: String? = null, context: ManualAllocatorContext? = null) = KITensor(name, this, ValueTypeInfo.TensorTypeInfo(TensorShape(this.shape), type.resolveProtoDataType()), context)

internal fun <T : NDArray> T.asTensor(name: String? = null) = (this as NDArrayCore).asTensor(name)
internal fun <T : NDArray> T.asTensor(name: String? = null, context: ManualAllocatorContext? = null) = (this as NDArrayCore).asTensor(name, context)

internal fun <T : NDArray> Collection<T>.asONNXTensors(names: List<String>): List<KITensor> {
return this.zip(names).map { (data, name) -> data.asTensor(name) }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package io.kinference.core.model

import io.kinference.core.KIONNXData
import io.kinference.core.*
import io.kinference.core.graph.KIGraph
import io.kinference.core.markOutput
import io.kinference.graph.Contexts
import io.kinference.model.Model
import io.kinference.ndarray.arrays.memory.*
Expand All @@ -18,14 +17,10 @@ class KIModel(
val name: String,
val opSet: OperatorSetRegistry,
val graph: KIGraph,
memoryLimiter: MemoryLimiter = MemoryLimiters.NoAllocator,
parallelismLimit: Int = PlatformUtils.cores,
predictionConfig: PredictionConfig = PredictionConfigs.NoAllocator,
) : Model<KIONNXData<*>>, Profilable, Cacheable {
private val profiles: MutableList<ProfilingContext> = ArrayList()

@OptIn(ExperimentalCoroutinesApi::class)
private val dispatcher: CoroutineDispatcher = Dispatchers.Default.limitedParallelism(parallelismLimit)
private val modelArrayStorage: ModelArrayStorage = ModelArrayStorage(memoryLimiter)
private val predictionContextDispatcher: PredictionContextDispatcher = PredictionContextDispatcher(predictionConfig)

override fun addProfilingContext(name: String): ProfilingContext = ProfilingContext(name).apply { profiles.add(this) }
override fun analyzeProfilingResults(): ProfileAnalysisEntry = profiles.analyze("Model $name")
Expand All @@ -37,24 +32,22 @@ class KIModel(
if (profile) addProfilingContext("Model $name") else null
)

val limiterContext = ParallelismLimiterContext(dispatcher)
var coreReserved = false
val results = try {
withContext(NonCancellable) {
ResourcesDispatcher.reserveCore()
coreReserved = true
}

val allocatorContext = modelArrayStorage.createAllocatorContext()
val mixedContext = allocatorContext + limiterContext

withContext(mixedContext) {
val coroutineContext = coroutineContext[AllocatorContext.Key]!!
val execResult = graph.execute(input, contexts)
execResult.forEach { it.markOutput() }
coroutineContext.closeAllocated()
execResult
val predictionContext = predictionContextDispatcher.getPredictionContext()
val output = if (predictionContextDispatcher.allocationMode != AllocationMode.Auto) withContext(predictionContext) {
return@withContext graph.execute(input, contexts)
} else withContext(predictionContext) {
return@withContext graph.execute(input, contexts).map { it.clone(it.name) }.toList()
}

predictionContextDispatcher.returnStorage(predictionContext)
output
} finally {
if (coreReserved) {
ResourcesDispatcher.releaseCore()
Expand All @@ -66,11 +59,11 @@ class KIModel(

override suspend fun close() {
graph.close()
modelArrayStorage.close()
predictionContextDispatcher.close()
}

override fun clearCache() {
modelArrayStorage.clearCache()
predictionContextDispatcher.clearCache()
}

companion object {
Expand All @@ -80,14 +73,13 @@ class KIModel(

suspend operator fun invoke(
proto: ModelProto,
memoryLimiter: MemoryLimiter = MemoryLimiters.NoAllocator,
limiterParallelismCounter: Int = PlatformUtils.cores,
predictionConfig: PredictionConfig = PredictionConfigs.NoAllocator,
): KIModel {
val name = "${proto.domain}:${proto.modelVersion}"
val id = "$name:${generateModelId()}"
val opSet = OperatorSetRegistry(proto.opSetImport)
val graph = KIGraph(proto.graph!!, opSet)
return KIModel(id, name, opSet, graph, memoryLimiter, limiterParallelismCounter)
return KIModel(id, name, opSet, graph, predictionConfig)
}
}
}
Loading

0 comments on commit 0d7094f

Please sign in to comment.