Skip to content

Commit 36d2b48

Browse files
committed
fix npe in ReduceRunner..
1 parent a05482e commit 36d2b48

8 files changed

Lines changed: 34 additions & 62 deletions

File tree

runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ public void processElements(Iterable<WindowedValue<InputT>> values) throws Excep
376376
emit(
377377
contextFactory.base(window, StateStyle.DIRECT),
378378
contextFactory.base(window, StateStyle.RENAMED),
379-
null);
379+
CausedByDrain.NORMAL);
380380
}
381381

382382
// We're all done with merging and emitting elements so can compress the activeWindow state.

runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,11 +445,13 @@ public <T> void output(TupleTag<T> tag, T output) {
445445

446446
@Override
447447
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
448+
checkTimestamp(elem.getTimestamp(), windowedValue.getTimestamp());
448449
SimpleDoFnRunner.this.outputWindowedValue(mainOutputTag, windowedValue);
449450
}
450451

451452
@Override
452453
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
454+
checkTimestamp(elem.getTimestamp(), windowedValue.getTimestamp());
453455
SimpleDoFnRunner.this.outputWindowedValue(tag, windowedValue);
454456
}
455457

@@ -1039,11 +1041,13 @@ public <T> void outputWindowedValue(
10391041

10401042
@Override
10411043
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
1044+
checkTimestamp(timestamp(), windowedValue.getTimestamp());
10421045
SimpleDoFnRunner.this.outputWindowedValue(mainOutputTag, windowedValue);
10431046
}
10441047

10451048
@Override
10461049
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
1050+
checkTimestamp(timestamp(), windowedValue.getTimestamp());
10471051
SimpleDoFnRunner.this.outputWindowedValue(tag, windowedValue);
10481052
}
10491053

@@ -1308,11 +1312,13 @@ public <T> void outputWindowedValue(
13081312

13091313
@Override
13101314
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
1315+
checkTimestamp(this.timestamp, windowedValue.getTimestamp());
13111316
SimpleDoFnRunner.this.outputWindowedValue(mainOutputTag, windowedValue);
13121317
}
13131318

13141319
@Override
13151320
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
1321+
checkTimestamp(this.timestamp, windowedValue.getTimestamp());
13161322
SimpleDoFnRunner.this.outputWindowedValue(tag, windowedValue);
13171323
}
13181324

sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestOutputReceiver.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.beam.sdk.transforms.DoFn;
2424
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
2525
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
26+
import org.apache.beam.sdk.values.CausedByDrain;
2627
import org.apache.beam.sdk.values.OutputBuilder;
2728
import org.apache.beam.sdk.values.WindowedValues;
2829
import org.joda.time.Instant;
@@ -54,6 +55,7 @@ public OutputBuilder<T> builder(T value) {
5455
.setWindow(fakeWindow)
5556
.setPaneInfo(PaneInfo.NO_FIRING)
5657
.setTimestamp(BoundedWindow.TIMESTAMP_MIN_VALUE)
58+
.setCausedByDrain(CausedByDrain.NORMAL)
5759
.setReceiver(windowedValue -> records.add(windowedValue.getValue()));
5860
}
5961

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,8 @@ public OutputBuilder<Row> builder(Row value) {
7070
rowWithMetadata -> {
7171
((DoFn<?, T>.WindowedContext) context)
7272
.outputWindowedValue(
73-
schemaCoder.getFromRowFunction().apply(rowWithMetadata.getValue()),
74-
rowWithMetadata.getTimestamp(),
75-
rowWithMetadata.getWindows(),
76-
rowWithMetadata.getPaneInfo());
73+
rowWithMetadata.withValue(
74+
schemaCoder.getFromRowFunction().apply(rowWithMetadata.getValue())));
7775
});
7876

7977
} else {
@@ -84,10 +82,8 @@ public OutputBuilder<Row> builder(Row value) {
8482
rowWithMetadata -> {
8583
context.outputWindowedValue(
8684
tag,
87-
schemaCoder.getFromRowFunction().apply(rowWithMetadata.getValue()),
88-
rowWithMetadata.getTimestamp(),
89-
rowWithMetadata.getWindows(),
90-
rowWithMetadata.getPaneInfo());
85+
rowWithMetadata.withValue(
86+
schemaCoder.getFromRowFunction().apply(rowWithMetadata.getValue())));
9187
});
9288
}
9389
}

sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -577,13 +577,7 @@ public OutputBuilder<T> builder(T value) {
577577
return outputBuilderSupplier
578578
.builder(value)
579579
.setReceiver(
580-
windowedValue ->
581-
outerContext.outputWindowedValue(
582-
tag,
583-
windowedValue.getValue(),
584-
windowedValue.getTimestamp(),
585-
windowedValue.getWindows(),
586-
windowedValue.getPaneInfo()));
580+
windowedValue -> outerContext.outputWindowedValue(tag, windowedValue));
587581
}
588582
};
589583
}

sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ public static <T> WindowedValue<T> of(
272272
CausedByDrain causedByDrain) {
273273
checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null");
274274
checkArgument(windows.size() > 0, "WindowedValue requires windows, but there were none");
275-
275+
checkArgument(causedByDrain != null, "WindowedValue requires CausedByDrain, but it was null");
276276
if (windows.size() == 1) {
277277
return of(
278278
value,
@@ -322,6 +322,7 @@ public static <T> WindowedValue<T> of(
322322
@Nullable Long currentRecordOffset,
323323
CausedByDrain causedByDrain) {
324324
checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null");
325+
checkArgument(causedByDrain != null, "WindowedValue requires CausedByDrain, but it was null");
325326

326327
boolean isGlobal = GlobalWindow.INSTANCE.equals(window);
327328
if (isGlobal && BoundedWindow.TIMESTAMP_MIN_VALUE.equals(timestamp)) {

sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
7878
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
7979
import org.apache.beam.sdk.util.UserCodeException;
80+
import org.apache.beam.sdk.values.CausedByDrain;
8081
import org.apache.beam.sdk.values.OutputBuilder;
8182
import org.apache.beam.sdk.values.TypeDescriptor;
8283
import org.apache.beam.sdk.values.TypeDescriptors;
@@ -561,6 +562,7 @@ public OutputBuilder<SomeRestriction> builder(SomeRestriction value) {
561562
.setTimestamp(mockTimestamp)
562563
.setWindow(mockWindow)
563564
.setPaneInfo(PaneInfo.NO_FIRING)
565+
.setCausedByDrain(CausedByDrain.NORMAL)
564566
.setReceiver(windowedValue -> outputs.add(windowedValue.getValue()));
565567
}
566568
};
@@ -801,6 +803,7 @@ public OutputBuilder<String> builder(String value) {
801803
.setTimestamp(mockTimestamp)
802804
.setWindow(mockWindow)
803805
.setPaneInfo(PaneInfo.NO_FIRING)
806+
.setCausedByDrain(CausedByDrain.NORMAL)
804807
.setReceiver(
805808
windowedValue -> {
806809
assertFalse(invoked);

sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java

Lines changed: 15 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -2098,10 +2098,8 @@ public OutputBuilder<Row> builder(Row value) {
20982098
.setReceiver(
20992099
windowedRow ->
21002100
ProcessBundleContextBase.this.outputWindowedValue(
2101-
fromRowFunction.apply(windowedRow.getValue()),
2102-
windowedRow.getTimestamp(),
2103-
windowedRow.getWindows(),
2104-
windowedRow.getPaneInfo()));
2101+
windowedRow.withValue(
2102+
fromRowFunction.apply(windowedRow.getValue()))));
21052103
}
21062104
};
21072105

@@ -2134,12 +2132,7 @@ public OutputBuilder<T> builder(T value) {
21342132
.withValue(value)
21352133
.setReceiver(
21362134
windowedValue ->
2137-
ProcessBundleContextBase.this.outputWindowedValue(
2138-
tag,
2139-
windowedValue.getValue(),
2140-
windowedValue.getTimestamp(),
2141-
windowedValue.getWindows(),
2142-
windowedValue.getPaneInfo()));
2135+
ProcessBundleContextBase.this.outputWindowedValue(tag, windowedValue));
21432136
}
21442137
};
21452138
}
@@ -2174,10 +2167,8 @@ public OutputBuilder<Row> builder(Row value) {
21742167
windowedRow ->
21752168
ProcessBundleContextBase.this.outputWindowedValue(
21762169
tag,
2177-
fromRowFunction.apply(windowedRow.getValue()),
2178-
windowedRow.getTimestamp(),
2179-
windowedRow.getWindows(),
2180-
windowedRow.getPaneInfo()));
2170+
windowedRow.withValue(
2171+
fromRowFunction.apply(windowedRow.getValue()))));
21812172
}
21822173
};
21832174
}
@@ -2456,10 +2447,8 @@ public OutputBuilder<Row> builder(Row value) {
24562447
.setReceiver(
24572448
windowedValue ->
24582449
context.outputWindowedValue(
2459-
fromRowFunction.apply(windowedValue.getValue()),
2460-
windowedValue.getTimestamp(),
2461-
windowedValue.getWindows(),
2462-
windowedValue.getPaneInfo()));
2450+
windowedValue.withValue(
2451+
fromRowFunction.apply(windowedValue.getValue()))));
24632452
}
24642453
};
24652454

@@ -2490,14 +2479,7 @@ public OutputBuilder<T> builder(T value) {
24902479
.setTimestamp(currentTimer.getHoldTimestamp())
24912480
.setWindow(currentWindow)
24922481
.setCausedByDrain(causedByDrain)
2493-
.setReceiver(
2494-
windowedValue ->
2495-
context.outputWindowedValue(
2496-
tag,
2497-
windowedValue.getValue(),
2498-
windowedValue.getTimestamp(),
2499-
windowedValue.getWindows(),
2500-
windowedValue.getPaneInfo()));
2482+
.setReceiver(windowedValue -> context.outputWindowedValue(tag, windowedValue));
25012483
}
25022484
};
25032485
}
@@ -2532,10 +2514,8 @@ public OutputBuilder<Row> builder(Row value) {
25322514
windowedValue ->
25332515
context.outputWindowedValue(
25342516
tag,
2535-
fromRowFunction.apply(windowedValue.getValue()),
2536-
windowedValue.getTimestamp(),
2537-
windowedValue.getWindows(),
2538-
windowedValue.getPaneInfo()));
2517+
windowedValue.withValue(
2518+
fromRowFunction.apply(windowedValue.getValue()))));
25392519
}
25402520
};
25412521
}
@@ -2775,10 +2755,8 @@ public OutputBuilder<Row> builder(Row value) {
27752755
.setReceiver(
27762756
windowedValue ->
27772757
context.outputWindowedValue(
2778-
fromRowFunction.apply(windowedValue.getValue()),
2779-
windowedValue.getTimestamp(),
2780-
windowedValue.getWindows(),
2781-
windowedValue.getPaneInfo()));
2758+
windowedValue.withValue(
2759+
fromRowFunction.apply(windowedValue.getValue()))));
27822760
}
27832761
};
27842762

@@ -2810,13 +2788,7 @@ public OutputBuilder<T> builder(T value) {
28102788
.setWindow(currentWindow)
28112789
.setCausedByDrain(causedByDrain)
28122790
.setPaneInfo(currentTimer.getPaneInfo())
2813-
.setReceiver(
2814-
windowedValue ->
2815-
context.outputWindowedValue(
2816-
windowedValue.getValue(),
2817-
windowedValue.getTimestamp(),
2818-
windowedValue.getWindows(),
2819-
windowedValue.getPaneInfo()));
2791+
.setReceiver(windowedValue -> context.outputWindowedValue(windowedValue));
28202792
}
28212793
};
28222794
}
@@ -2851,10 +2823,8 @@ public OutputBuilder<Row> builder(Row value) {
28512823
.setReceiver(
28522824
windowedValue ->
28532825
context.outputWindowedValue(
2854-
fromRowFunction.apply(windowedValue.getValue()),
2855-
windowedValue.getTimestamp(),
2856-
windowedValue.getWindows(),
2857-
windowedValue.getPaneInfo()));
2826+
windowedValue.withValue(
2827+
fromRowFunction.apply(windowedValue.getValue()))));
28582828
}
28592829
};
28602830
}

0 commit comments

Comments
 (0)