-
Notifications
You must be signed in to change notification settings - Fork 706
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Beam backend: use TypedPipe descriptions as names for PTransforms #1983
base: develop
Are you sure you want to change the base?
Conversation
Problem: When debugging Beam jobs running on Dataflow, it is hard to map a failure in the Dataflow UI back to the corresponding line in the Scalding code. Solution: Assign the description of the TypedPipe as the name of the PTransform(this name appears in the UI). This allows mapping the error back to the line in the Scalding code since by default, the descriptions contain the line numbers.
Navin Viswanath seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account. You have signed the CLA already but the status is still pending? Let us recheck it. |
Codecov Report
@@ Coverage Diff @@
## develop #1983 +/- ##
=============================================
+ Coverage 35.74% 37.92% +2.17%
- Complexity 1079 1153 +74
=============================================
Files 316 316
Lines 20941 21147 +206
Branches 2850 2919 +69
=============================================
+ Hits 7485 8019 +534
+ Misses 12557 12136 -421
- Partials 899 992 +93
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just had a couple of minor comments.
scalding-beam/src/main/scala/com/twitter/scalding/beam_backend/BeamBackend.scala
Outdated
Show resolved
Hide resolved
rec[a](wd.input) | ||
case (wd: WithDescriptionTypedPipe[_], rec) => { | ||
val op = rec(wd.input) | ||
op.withName(wd.descriptions.map(_._1).head) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.head
can throw. Can we instead do:
wd.descriptions match {
case head :: _ =>
op.withName(head._1)
case Nil =>
op
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. I thought that for a WithDescriptionTypedPipe
this could not be empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it maybe can't but we didn't make that clear in the types... :/
@@ -116,7 +134,7 @@ object BeamPlanner { | |||
val ops: Seq[BeamOp[(K, Any)]] = cg.inputs.map(tp => rec(tp)) | |||
CoGroupedOp(cg, ops) | |||
} | |||
go(cg) | |||
if (cg.descriptions.isEmpty) go(cg) else go(cg).withName(cg.descriptions.last) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why using last
here but head
above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant to ask you about this myself. If you look at the test case in this PR, The cogrouped expression has two descriptions "Count words" and "Join with t1", both of which appear in the descriptions of CoGrouped
. But since this appears to be appending descriptions for eg. in CoGrouped.Pair
and the optimization rule ComposeDescriptions
, I chose to keep the last one. Hence no assertion for "Count words" even in that test case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not combine all of them? Why not just cg.descriptions.mkString("\n")
or something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, let me try this out on a job first. Thanks!
@@ -137,7 +155,21 @@ object BeamPlanner { | |||
|
|||
def defaultOptimizationRules(config: Config): Seq[Rule[TypedPipe]] = { | |||
def std(forceHash: Rule[TypedPipe]) = | |||
OptimizationRules.standardMapReduceRules ::: | |||
List( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the change here? Is this copying the same cascading optimizations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It really is the standardMapReduceRules
with DescribeLater
excluded. I've mentioned in the summary of this PR that I chose to exclude that rule in order to capture the line numbers at the right points, without the optimizer changing the AST. I'm curious if there's a better way to do this though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry I missed that. Note: the line numbers are already captured by the time that rule runs. The line numbers are collected on the TypedPipe
. I don't see how removing that rule helps descriptions.
If I were you, I would look at all the descriptions and add the full list.
The problem with removing that rule is that it will block merging nodes together. It may be fine, maybe Beam will follow up with optimizations, but I would be careful: scalding may do some optimizations that beam doesn't.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The line numbers are in the descriptions though, if users don't explicitly add them with a call to withDescription
.
Here's what I'm trying to do: if we have a TypedPipe
that looks something like this:
WithDescription(Mapped(WithDescription(FlatMapped(), "a")), "b")
I'm trying to associate the description "a" with the PTransform corresponding to the FlatMapped
and the description "b" with the PTransform corresponding to the Mapped
. By default, the "a" and "b" here are line numbers unless the user explicitly added their own descriptions.
If I let DescribeLater
run, I think we'll end up with something like this:
WithDescription(FlatMapped(), ["a","b"])
(since this would also compose the Mapped
and FlatMapped
, and then also run ComposeDescriptions
)
This would be hard to debug, when there is an error in this stage in the job since it's hard to map back to the code.
But as I write this, I'm thinking if its probably better to just concatenate the line numbers instead...
Dataflow (which is the Beam runner we're using) does apply similar optimizations (I've mentioned this in the summary), but I'm debating now if its a good idea to change what the optimizer does, since it definitely makes the optimizer less useful. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really know the answer. So, line numbers are added to descriptions when the user constructs the TypedPipe, see:
scalding/scalding-base/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala
Line 533 in 6434348
private[scalding] def withLine: TypedPipe[T] = |
The idea of DescribeLater
is to try to merge as many operations into one step and then put all the line numbers/descriptions on that single step. By taking just a single item you are hiding which actions might be combined.
As to what to do, I don't know. If beam's optimizer is very good, it maybe doesn't matter. Maybe you should try to separate runs with a somewhat complex job and compare?
Also, I can imagine a Config
setting like scalding.preserve_description_order
that removes applying that optimization. Then if you were debugging, you could set that to true, but for a production job you could set it to false.
I would probably bias to just combining all the descriptions into a single beam description unless you actually see problems. You can always come back and add that setting. I would personally bias to maximizing the utility of the optimizer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Let me try out some jobs with the rule enabled and showing all the descriptions.
import com.twitter.scalding.typed.functions.ComposedFunctions.ComposedMapGroup | ||
import com.twitter.scalding.typed.functions.{EmptyGuard, MapValueStream, ScaldingPriorityQueueMonoid, SumAll} | ||
import com.twitter.scalding.typed.{CoGrouped, Input} | ||
import com.twitter.scalding.typed.{CoGrouped, TypedSource} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the change back from Input?
(I think I know the answer: you all are actually using a fork... I really hope you will get onto the mainline branch. I am contributing my time to help you, I hope you will contribute your energy to help others by actually using the open source version).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't really change it back...this was just a missed import, but you're right that we're using a fork. Thank you for investing your time in this, it really is much appreciated! I have it on my todo list to get us on the mainline branch. I do need to go through your earlier refactor more closely though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't follow. I changed this code to using Input
which is a subclass and replacement for TypedSource
. It seems the diff changes it back from Input to TypedSource. Can you restate what you meant that this is a "missed import"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I wasn't clear. This change is now outdated - I updated it to remove the reference to TypedSource
. The reason it was there in the first place was because I brought it over from our fork, but I hadn't removed unused imports, which is why it showed up. You should see no reference to TypedSource
now in the diff.
scalding-beam/src/main/scala/com/twitter/scalding/beam_backend/BeamBackend.scala
Show resolved
Hide resolved
rec[a](wd.input) | ||
case (wd: WithDescriptionTypedPipe[_], rec) => { | ||
val op = rec(wd.input) | ||
wd.descriptions match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, as commented, I think this is probably running the risk of dropping some descriptions.
I would do:
op.withName(wd.descriptions.map(_._1).mkString(", "))
otherwise I think you will wind up with cases where you lose line numbers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I'll try this out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this generally looks good, I'm just worried about two things:
- disabling moving descriptions deactivating many (most?) optimizations
- not including all the descriptions (I think you will lose line numbers).
Problem: When debugging Beam jobs running on Dataflow, it is hard to map a failure in the Dataflow UI back to the corresponding line in the Scalding code.
Solution: Assign the description of the TypedPipe as the name of the PTransform(this name appears in the Dataflow UI). This allows mapping the error back to the line in the Scalding code since by default, the descriptions contain the line numbers.
In order to make this work, I changed the Beam backend optimization rules to not use the
DescribeLater
rule, which moves aWithDescriptionTypedPipe
higher in the AST. This was necessary in order to preserve the location of the description in the AST and apply it to the right PTransform in Beam. This possibly introduces a few unintended consequences for the optimizer:ComposeMap
,ComposeFlatMap
etc. may not be applicable since those rely onDescribeLater
being run in an earlier phase. While such optimizations dont seem to exist in Beam, they do have something along these lines in Dataflow. So this should ideally not have any performance implications, at least in Dataflow.DescribeLater
also allowsComposeDescriptions
to work better, which reduces the number of nodes in the AST. Its possible that removingDescribeLater
can make the AST very large.