11package org.jetbrains.kotlinx.dataframe.impl.api
22
3+ import io.github.oshai.kotlinlogging.KotlinLogging
34import org.jetbrains.kotlinx.dataframe.AnyFrame
45import org.jetbrains.kotlinx.dataframe.AnyRow
56import org.jetbrains.kotlinx.dataframe.ColumnsSelector
@@ -11,13 +12,13 @@ import org.jetbrains.kotlinx.dataframe.api.ConvertSchemaDsl
1112import org.jetbrains.kotlinx.dataframe.api.ConverterScope
1213import org.jetbrains.kotlinx.dataframe.api.ExcessiveColumns
1314import org.jetbrains.kotlinx.dataframe.api.Infer
15+ import org.jetbrains.kotlinx.dataframe.api.add
1416import org.jetbrains.kotlinx.dataframe.api.all
1517import org.jetbrains.kotlinx.dataframe.api.allNulls
1618import org.jetbrains.kotlinx.dataframe.api.asColumnGroup
1719import org.jetbrains.kotlinx.dataframe.api.concat
1820import org.jetbrains.kotlinx.dataframe.api.convertTo
1921import org.jetbrains.kotlinx.dataframe.api.emptyDataFrame
20- import org.jetbrains.kotlinx.dataframe.api.getColumnPaths
2122import org.jetbrains.kotlinx.dataframe.api.isEmpty
2223import org.jetbrains.kotlinx.dataframe.api.map
2324import org.jetbrains.kotlinx.dataframe.api.name
@@ -29,12 +30,14 @@ import org.jetbrains.kotlinx.dataframe.columns.ColumnGroup
2930import org.jetbrains.kotlinx.dataframe.columns.ColumnKind
3031import org.jetbrains.kotlinx.dataframe.columns.ColumnPath
3132import org.jetbrains.kotlinx.dataframe.columns.FrameColumn
33+ import org.jetbrains.kotlinx.dataframe.columns.UnresolvedColumnsPolicy
3234import org.jetbrains.kotlinx.dataframe.columns.toColumnSet
3335import org.jetbrains.kotlinx.dataframe.exceptions.ExcessiveColumnsException
3436import org.jetbrains.kotlinx.dataframe.exceptions.TypeConversionException
3537import org.jetbrains.kotlinx.dataframe.impl.emptyPath
36- import org.jetbrains.kotlinx.dataframe.impl.schema.createEmptyColumn
38+ import org.jetbrains.kotlinx.dataframe.impl.getColumnPaths
3739import org.jetbrains.kotlinx.dataframe.impl.schema.createEmptyDataFrame
40+ import org.jetbrains.kotlinx.dataframe.impl.schema.createNullFilledColumn
3841import org.jetbrains.kotlinx.dataframe.impl.schema.extractSchema
3942import org.jetbrains.kotlinx.dataframe.impl.schema.render
4043import org.jetbrains.kotlinx.dataframe.kind
@@ -45,6 +48,8 @@ import kotlin.reflect.KType
4548import kotlin.reflect.full.withNullability
4649import kotlin.reflect.jvm.jvmErasure
4750
51+ private val logger = KotlinLogging .logger {}
52+
4853private open class Converter (val transform : ConverterScope .(Any? ) -> Any? , val skipNulls : Boolean )
4954
5055private class Filler (val columns : ColumnsSelector <* , * >, val expr : RowExpression <* , * >)
@@ -252,22 +257,16 @@ internal fun AnyFrame.convertToImpl(
252257 }
253258 }.toMutableList()
254259
255- // when the target is nullable but the source does not contain a column, fill it in with nulls / empty dataframes
260+ // when the target is nullable but the source does not contain a column,
261+ // fill it in with nulls / empty dataframes
256262 val size = this .size.nrow
257263 schema.columns.forEach { (name, targetColumn) ->
258- val isNullable =
259- // like value column of type Int?
260- targetColumn.nullable ||
261- // like value column of type Int? (backup check)
262- targetColumn.type.isMarkedNullable ||
263- // like DataRow<Something?> for a group column (all columns in the group will be nullable)
264- targetColumn.contentType?.isMarkedNullable == true ||
265- // frame column can be filled with empty dataframes
266- targetColumn.kind == ColumnKind .Frame
267-
268264 if (name !in visited) {
269- newColumns + = targetColumn.createEmptyColumn(name, size)
270- if (! isNullable) {
265+ try {
266+ newColumns + = targetColumn.createNullFilledColumn(name, size)
267+ } catch (e: IllegalStateException ) {
268+ logger.debug(e) { " " }
269+ // if this could not be done automatically, they need to be filled manually
271270 missingPaths.add(path + name)
272271 }
273272 }
@@ -279,14 +278,39 @@ internal fun AnyFrame.convertToImpl(
279278 val marker = MarkersExtractor .get(clazz)
280279 var result = convertToSchema(marker.schema, emptyPath())
281280
281+ /*
282+ * Here we handle all registered fillers of the user.
283+ * Fillers are registered in the DSL like:
284+ * ```kt
285+ * df.convertTo<Target> {
286+ * fill { col1 and col2 }.with { something }
287+ * fill { col3 }.with { somethingElse }
288+ * }
289+ * ```
290+ * Users can use this to fill up any column that was missing during the conversion.
291+ * They can also fill up and thus overwrite any existing column here.
292+ */
282293 dsl.fillers.forEach { filler ->
283- val paths = result.getColumnPaths(filler.columns)
284- missingPaths.removeAll(paths.toSet())
285- result = result.update { paths.toColumnSet() }.with {
286- filler.expr(this , this )
294+ // get all paths from the `fill { col1 and col2 }` part
295+ val paths = result.getColumnPaths(UnresolvedColumnsPolicy .Create , filler.columns).toSet()
296+
297+ // split the paths into those that are already in the df and those that are missing
298+ val (newPaths, existingPaths) = paths.partition { it in missingPaths }
299+
300+ // first fill cols that are already in the df using the `with {}` part of the dsl
301+ result = result.update { existingPaths.toColumnSet() }.with { filler.expr(this , this ) }
302+
303+ // then create any missing ones by filling using the `with {}` part of the dsl
304+ result = newPaths.fold(result) { df, newPath ->
305+ df.add(newPath, Infer .Type ) { filler.expr(this , this ) }
287306 }
307+
308+ // remove the paths that are now filled
309+ missingPaths - = paths
288310 }
289311
312+ // Inform the user which target columns could not be created in the conversion
313+ // The user will need to supply extra information for these, like `fill {}` them.
290314 if (missingPaths.isNotEmpty()) {
291315 throw IllegalArgumentException (
292316 " The following columns were not found in DataFrame: ${
0 commit comments