Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36070][flink-runtime-web] Modifying web UI to adapt to Incremental JobGraph Generation #25804

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1277,6 +1277,7 @@ void testJobDetailsContainsSlotSharingGroupId() throws Exception {
Collections.singletonMap(JobStatus.RUNNING, 1L),
jobVertexDetailsInfos,
Collections.singletonMap(ExecutionState.RUNNING, 1),
new JobPlanInfo.RawJson("{\"id\":\"1234\"}"),
new JobPlanInfo.RawJson("{\"id\":\"1234\"}"));
final TestJobDetailsInfoHandler jobDetailsInfoHandler =
new TestJobDetailsInfoHandler(jobDetailsInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,39 @@
>
<xhtml:div
class="node-label-wrapper"
[class.initializing]="!initialized"
[style.border-color]="borderColor"
[style.background-color]="backgroundColor"
>
<h4 class="content-wrap">
<xhtml:div class="detail">{{ operator }}</xhtml:div>
<xhtml:div class="detail description">{{ description }}</xhtml:div>
<xhtml:div class="detail description" *ngIf="initialized">{{ description }}</xhtml:div>
<xhtml:div class="node-label">Parallelism: {{ parallelism }}</xhtml:div>
<xhtml:div
class="node-label metric"
title="Maximum back pressured percentage across all subtasks"
*ngIf="initialized"
>
Backpressured (max): {{ prettyPrint(backPressuredPercentage) }}
</xhtml:div>
<xhtml:div class="node-label metric" title="Maximum busy percentage across all subtasks">
<xhtml:div
class="node-label metric"
title="Maximum busy percentage across all subtasks"
*ngIf="initialized"
>
Busy (max): {{ prettyPrint(busyPercentage) }}
</xhtml:div>
<xhtml:div class="node-label metric" title="Data skew percentage across all subtasks">
<xhtml:div
class="node-label metric"
title="Data skew percentage across all subtasks"
*ngIf="initialized"
>
Data Skew: {{ prettyPrint(dataSkewPercentage) }}
</xhtml:div>
<xhtml:div class="node-label metric" *ngIf="lowWatermark">
<xhtml:div class="node-label metric" *ngIf="lowWatermark && initialized">
Low Watermark: {{ lowWatermark }}
</xhtml:div>
<xhtml:div class="detail last" *ngIf="operatorStrategy">
<xhtml:div class="detail last" *ngIf="operatorStrategy && initialized">
Operation: {{ operatorStrategy }}
</xhtml:div>
</h4>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,13 @@
margin-bottom: 0;
}
}

&.initializing {
border: none;
background: linear-gradient(to right, #000 50%, #fff 0%) top/20px 1px repeat-x,
/* top */ linear-gradient(#000 50%, #fff 0%) right/1px 20px repeat-y,
/* right */ linear-gradient(to right, #000 50%, #fff 0%) bottom/20px 1px repeat-x,
/* bottom */ linear-gradient(#000 50%, #fff 0%) left/1px 20px repeat-y; /* left */
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,18 @@ export class NodeComponent {
backPressuredPercentage: number | undefined = NaN;
busyPercentage: number | undefined = NaN;
dataSkewPercentage: number | undefined = NaN;
backgroundColor: string | undefined;
borderColor: string | undefined;
initialized: boolean = false;
backgroundColor: string;
borderColor: string;
height = 0;
id: string;
backgroundBusyColor = '#ee6464';
backgroundDefaultColor = '#5db1ff';
backgroundPendingColor = '#ffffff';
backgroundBackPressuredColor = '#888888';
borderBusyColor = '#ee2222';
borderDefaultColor = '#1890ff';
borderPendingColor = '#000000';
borderBackPressuredColor = '#000000';

decodeHTML(value: string): string | null {
Expand All @@ -65,6 +68,11 @@ export class NodeComponent {
this.operatorStrategy = this.decodeHTML(value.operator_strategy);
this.parallelism = value.parallelism;
this.lowWatermark = value.lowWatermark;
if (value.initialized) {
this.initialized = true;
}
this.borderColor = !value.initialized ? this.borderPendingColor : this.borderDefaultColor;
this.backgroundColor = !value.initialized ? this.backgroundPendingColor : this.backgroundDefaultColor;
if (this.isValid(value.backPressuredPercentage)) {
this.backPressuredPercentage = value.backPressuredPercentage;
}
Expand Down Expand Up @@ -144,8 +152,6 @@ export class NodeComponent {
*/
update(node: NodesItemCorrect): void {
this.node = node;
this.backgroundColor = this.backgroundDefaultColor;
this.borderColor = this.borderDefaultColor;
if (node.busyPercentage) {
this.backgroundColor = this.blend(this.backgroundColor, this.backgroundBusyColor, node.busyPercentage / 100.0);
this.borderColor = this.blend(this.borderColor, this.borderBusyColor, node.busyPercentage / 100.0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,15 @@
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<label
nz-checkbox
class="checkbox"
[(ngModel)]="showPendingOperators"
(ngModelChange)="onGraphTypeChanged()"
[style.visibility]="showGraphSwitch ? 'visible' : 'hidden'"
>
Show Pending Operators (Remaining: {{ pendingOperatorsCount }})
</label>
<flink-svg-container [style.visibility]="visibility" (transformEvent)="onTransform($event)">
<svg:g class="graph" #graphElement>
<defs>
Expand Down Expand Up @@ -54,6 +62,7 @@
class="edge"
[id]="'link-' + link.id"
[class.focused]="link.options?.focused"
[class.initializing]="!link?.initialized"
[attr.marker-end]="'url(#end-arrow' + (link.options?.focused ? '-focus' : '') + ')'"
></svg:path>
<svg:text class="edge-label" text-anchor="middle" dy="20">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ nz-slider {
margin-top: -100px;
}

.checkbox {
margin: 10px;
}

.graph {
user-select: none;

Expand Down Expand Up @@ -59,6 +63,14 @@ nz-slider {
stroke-linecap: round;
stroke-opacity: 1;
}

&.initializing {
stroke: @text-color;
stroke-dasharray: 10;
stroke-dashoffset: 0;
stroke-linecap: round;
stroke-opacity: 1;
}
}

.link-group {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import { FormsModule } from '@angular/forms';
import { NodesItemCorrect, NodesItemLink } from '@flink-runtime-web/interfaces';
import { select } from 'd3-selection';
import { zoomIdentity } from 'd3-zoom';
import { NzCheckboxModule } from 'ng-zorro-antd/checkbox';
import { NzSliderModule } from 'ng-zorro-antd/slider';

import { NodeComponent } from './components/node/node.component';
Expand All @@ -51,7 +52,7 @@ enum Visibility {
templateUrl: './dagre.component.html',
styleUrls: ['./dagre.component.less'],
changeDetection: ChangeDetectionStrategy.OnPush,
imports: [SvgContainerComponent, NodeComponent, NzSliderModule, FormsModule, CommonModule],
imports: [SvgContainerComponent, NodeComponent, NzSliderModule, FormsModule, CommonModule, NzCheckboxModule],
standalone: true
})
export class DagreComponent extends NzGraph {
Expand All @@ -60,6 +61,7 @@ export class DagreComponent extends NzGraph {
focusedLinkIds: string[] = [];
selectedNodeId: string | null;
zoom = 1;
showPendingOperators = false;
cacheTransform = { x: 0, y: 0, k: 1 };
oldTransform = { x: 0, y: 0, k: 1 };
cacheNodes: NodesItemCorrect[] = [];
Expand All @@ -72,7 +74,10 @@ export class DagreComponent extends NzGraph {
@ViewChild('overlayElement', { static: true }) overlayElement: ElementRef;
@Input() xCenter = 2;
@Input() yCenter = 2;
@Input() showGraphSwitch: boolean = false;
@Input() pendingOperatorsCount: number = 0;
@Output() nodeClick = new EventEmitter<LayoutNode | null>();
@Output() graphTypeChanged = new EventEmitter<boolean>();

/**
* Update Node detail
Expand Down Expand Up @@ -139,6 +144,10 @@ export class DagreComponent extends NzGraph {
}
}

onGraphTypeChanged(): void {
this.graphTypeChanged.emit(this.showPendingOperators);
}

/**
* Flush graph with nodes and links
*
Expand Down Expand Up @@ -263,7 +272,7 @@ export class DagreComponent extends NzGraph {
if ($event) {
$event.stopPropagation();
}
if (node) {
if (node && node.initialized) {
if (emit) {
this.nodeClick.emit(node);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ export class NzGraph {
focused: false,
dominantBaseline: this.getDominantBaseline(edge)
},
detail: { ...edge }
detail: { ...edge },
initialized: edge.initialized
};
this.layoutLinks.push({ ...link, options: { ...link.options } });
this.copyLayoutLinks.push({ ...link, options: { ...link.options } });
Expand Down
16 changes: 15 additions & 1 deletion flink-runtime-web/web-dashboard/src/app/interfaces/job-detail.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export interface JobStatusCounts {
FINISHED: number;
FAILED: number;
RECONCILING: number;
PENDING: number;
}

interface TimestampsStatus {
Expand Down Expand Up @@ -57,6 +58,7 @@ export interface JobDetail {
vertices: VerticesItem[];
'status-counts': JobStatusCounts;
plan: Plan;
'stream-graph-plan': StreamGraphPlan;
}

interface Plan {
Expand All @@ -66,6 +68,11 @@ interface Plan {
nodes: NodesItem[];
}

interface StreamGraphPlan {
pending_operator_count: number;
nodes: StreamNodesItem[];
}

interface InputsItem {
num: number;
id: string;
Expand Down Expand Up @@ -107,6 +114,7 @@ export interface TasksStatus {
RECONCILING: number;
CANCELING: number;
INITIALIZING: number;
PENDING: number;
}

interface MetricsStatus {
Expand All @@ -132,12 +140,17 @@ export interface NodesItem {
height?: number;
}

export interface StreamNodesItem extends NodesItem {
job_vertex_id: string;
}

export interface NodesItemCorrect extends NodesItem {
detail: VerticesItem | undefined;
detail?: VerticesItem;
lowWatermark?: number;
backPressuredPercentage?: number;
busyPercentage?: number;
dataSkewPercentage?: number;
initialized?: boolean;
}

export interface NodesItemLink {
Expand All @@ -147,6 +160,7 @@ export interface NodesItemLink {
width?: number;
ship_strategy?: string;
local_strategy?: string;
initialized?: boolean;
}

export interface JobDetailCorrect extends JobDetail {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ export interface VertexTaskManagerDetail {
RUNNING: number;
SCHEDULED: number;
INITIALIZING: number;
PENDING: number;
};
aggregated: JobVertexAggregated;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,21 @@
nzMessage="Job is not running yet."
></nz-alert>
<div class="container" [style.height.px]="top">
<flink-dagre (nodeClick)="onNodeClick($event)" [xCenter]="4"></flink-dagre>
<flink-dagre
(nodeClick)="onNodeClick($event)"
[xCenter]="4"
[showGraphSwitch]="jobType === 'BATCH'"
[pendingOperatorsCount]="nodes.length - initializedNodes.length"
(graphTypeChanged)="refreshGraph($event)"
></flink-dagre>
<router-outlet></router-outlet>
</div>

<ng-container *ngIf="nodes.length > 0">
<flink-job-overview-list
(nodeClick)="onNodeClick($event)"
(rescale)="onRescale($event)"
[nodes]="nodes"
[nodes]="initializedNodes"
[selectedNode]="selectedNode"
></flink-job-overview-list>
<flink-resize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ import { JobLocalService } from '../job-local.service';
})
export class JobOverviewComponent implements OnInit, OnDestroy {
public nodes: NodesItemCorrect[] = [];
public initializedNodes: NodesItemCorrect[] = [];
public links: NodesItemLink[] = [];
public selectedNode: NodesItemCorrect | null;
public top = 500;
public jobId: string;
public timeoutId: number;
public jobType: string = 'STREAMING';

@ViewChild(DagreComponent, { static: true }) private readonly dagreComponent: DagreComponent;

Expand All @@ -79,11 +81,13 @@ export class JobOverviewComponent implements OnInit, OnDestroy {
takeUntil(this.destroy$)
)
.subscribe(data => {
if (this.jobId !== data.plan.jid || this.nodes.length === 0) {
if (this.jobId !== data.plan.jid || this.checkNodesChanged(data.plan.nodes)) {
this.jobId = data.plan.jid;
this.jobType = data['job-type'];
this.nodes = data.plan.nodes;
this.initializedNodes = this.filterInitializedNodes(this.nodes);
this.links = data.plan.links;
this.jobId = data.plan.jid;
this.dagreComponent.flush(this.nodes, this.links, true).then();
this.refreshGraph(this.dagreComponent.showPendingOperators);
this.refreshNodesWithMetrics();
} else {
this.nodes = data.plan.nodes;
Expand All @@ -106,6 +110,14 @@ export class JobOverviewComponent implements OnInit, OnDestroy {
});
}

private filterInitializedNodes(nodes: NodesItemCorrect[]): NodesItemCorrect[] {
return nodes.filter(node => node.initialized);
}

private initializedLinks(): NodesItemLink[] {
return this.links.filter(link => link.initialized);
}

public ngOnDestroy(): void {
this.destroy$.next();
this.destroy$.complete();
Expand Down Expand Up @@ -137,7 +149,10 @@ export class JobOverviewComponent implements OnInit, OnDestroy {
}

public refreshNodesWithMetrics(): void {
this.mergeWithBackPressureAndSkew(this.nodes)
if (this.dagreComponent.showPendingOperators) {
return;
}
this.mergeWithBackPressureAndSkew(this.initializedNodes)
.pipe(
mergeMap(nodes => this.mergeWithWatermarks(nodes)),
takeUntil(this.destroy$)
Expand Down Expand Up @@ -184,4 +199,17 @@ export class JobOverviewComponent implements OnInit, OnDestroy {
})
).pipe(catchError(() => of(nodes)));
}

private checkNodesChanged(updatedNodes: NodesItemCorrect[]): boolean {
const updatedInitializedNodes = this.filterInitializedNodes(updatedNodes);
return updatedInitializedNodes.length !== this.initializedNodes.length;
}

refreshGraph(showPendingOperators: boolean): void {
if (showPendingOperators) {
this.dagreComponent.flush(this.nodes, this.links, true).then();
} else {
this.dagreComponent.flush(this.initializedNodes, this.initializedLinks(), true).then();
}
}
}
Loading