Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Features for 0.2.0 #29

Merged
merged 4 commits into from
Nov 25, 2023
Merged

Features for 0.2.0 #29

merged 4 commits into from
Nov 25, 2023

Conversation

istreeter
Copy link
Contributor

No description provided.

@istreeter istreeter force-pushed the pubsub-sink-wrap-bytes branch 4 times, most recently from 72d55e4 to b522173 Compare November 23, 2023 06:53
@istreeter istreeter force-pushed the pubsub-sink-wrap-bytes branch from b522173 to bb2367c Compare November 23, 2023 16:29
@istreeter istreeter changed the title pubsub sink: wrap bytes instead of copying (close #28) Features for 0.2.0 Nov 23, 2023
Copy link
Contributor

@benjben benjben left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great !

*/
package com.google.protobuf

object UnsafeSnowplowOps {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why unsafe ? 😄

* The resulting `ListOfList` does not have the same order as the input List. This is helpful in
* Snowplow apps where order of events within batches is not important.
*/
def mapUnordered[B](f: A => B): ListOfList[B] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def mapUnordered[B](f: A => B): ListOfList[B] =
def map[B](f: A => B): ListOfList[B] =

Even though we don't care, the order ends up being preserved

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not preserved!

scala> val lol = ListOfList.of(List(List(1,2,3), List(4,5,6)))
val lol: com.snowplowanalytics.snowplow.sinks.ListOfList[Int] = com.snowplowanalytics.snowplow.sinks.ListOfList@3fdd4a80

scala> lol.copyToIndexedSeq
val res1: IndexedSeq[Int] = Vector(1, 2, 3, 4, 5, 6)

scala> lol.mapUnordered(identity).copyToIndexedSeq
val res2: IndexedSeq[Int] = Vector(6, 5, 4, 3, 2, 1)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get it now, it is because of .asIterable.toList that it is preserved here:

def nonEmpty3 = {
    val input    = ListOfList.of(List(List(1, 2, 3), List(4, 5, 6)))
    val expected = List(100, 200, 300, 400, 500, 600)
    val result   = input.mapUnordered(_ * 100).asIterable.toList

    result must containTheSameElementsAs(expected)
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see the confusion, but that's just a specs2 thing.

In specs2, containTheSameElementsAs only tests that all elements are present in any order. The documentation is here and it contains this example:

Finally, if you want to get the differences between 2 traversables:
Seq(2, 4, 1) must containTheSameElementsAs(Seq(1, 4, 2))

Comment on lines 113 to 128
private case class PendingBatch[B](value: Option[B], weight: Long)

private object PendingBatch {
def empty[B]: PendingBatch[B] = PendingBatch(None, 0L)
}

/**
* The result of combining a chunk of `A`s, while not exceeding total weight.
*
* @param notAtSize
* Optionally an `A` that does not yet exceed the maximum allowed size. We should not emit this
* `A` but instead wait in case we can combine it with other `A`s later.
* Optionally a batch `B` that does not yet exceed the maximum allowed size. We should not emit
* this `B` but instead wait in case we can combine it with other `A`s later.
* @param toEmit
* The combined `A`s which meet size requirements. These should be emitted downstream because we
* The combined `B`s which meet size requirements. These should be emitted downstream because we
* cannot combine them with anything more.
*/
private case class CombineByWeightResult[A](notAtSize: Option[A], toEmit: Vector[A])
private case class CombineByWeightResult[B](
doNotEmitYet: Option[B],
pendingWeight: Long,
toEmit: Vector[B]
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it a bit hard to follow to have both a PendingBatch and a CombineByWeightResult which is supposed to be The result of combining a chunk but that can nevertheless be incomplete

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I re-worked the classes and return types a bit, which I think makes it neater now.

private case class PendingBatch[B](value: Option[B], weight: Long)

private object PendingBatch {
def empty[B]: PendingBatch[B] = PendingBatch(None, 0L)
}

/**
* The result of combining a chunk of `A`s, while not exceeding total weight.
*
* @param notAtSize
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @param notAtSize
* @param doNotEmitYet

@istreeter istreeter force-pushed the pubsub-sink-wrap-bytes branch from bb2367c to b326dd4 Compare November 25, 2023 20:27
@istreeter istreeter merged commit b326dd4 into develop Nov 25, 2023
1 check passed
@istreeter istreeter deleted the pubsub-sink-wrap-bytes branch November 25, 2023 20:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants