Skip to content

Commit 597875e

Browse files
committed
add valuekind
1 parent a615657 commit 597875e

21 files changed

Lines changed: 804 additions & 56 deletions

File tree

model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -757,6 +757,15 @@ message Elements {
757757
}
758758
}
759759

760+
message ValueKind {
761+
enum Enum {
762+
INSERT = 0;
763+
UPDATE_BEFORE = 1;
764+
UPDATE_AFTER = 2;
765+
DELETE = 3;
766+
}
767+
}
768+
760769
// Element metadata passed as part of WindowedValue to make WindowedValue
761770
// extensible and backward compatible
762771
message ElementMetadata {
@@ -770,6 +779,9 @@ message Elements {
770779
// across IOs - Kafka, PubSub, http.
771780
// Example value: congo=t61rcWkgMzE
772781
optional string tracestate = 3;
782+
// (Optional) The kind of value for CDC metadata.
783+
// If missing or unspecified, implies INSERT for backwards compatibility.
784+
optional ValueKind.Enum value_kind = 4;
773785
}
774786

775787
// Represent the encoded user timer for a given instruction, transform and

runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.apache.beam.sdk.values.PCollectionView;
5353
import org.apache.beam.sdk.values.Row;
5454
import org.apache.beam.sdk.values.TupleTag;
55+
import org.apache.beam.sdk.values.ValueKind;
5556
import org.apache.beam.sdk.values.WindowedValue;
5657
import org.apache.beam.sdk.values.WindowedValues;
5758
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
@@ -407,6 +408,11 @@ public CausedByDrain causedByDrain() {
407408
return element.causedByDrain();
408409
}
409410

411+
@Override
412+
public ValueKind valueKind() {
413+
return element.getValueKind();
414+
}
415+
410416
@Override
411417
public PipelineOptions getPipelineOptions() {
412418
return pipelineOptions;
@@ -454,6 +460,31 @@ public <T> void outputWithTimestamp(TupleTag<T> tag, T value, Instant timestamp)
454460
element.causedByDrain()));
455461
}
456462

463+
@Override
464+
public void outputWithKind(OutputT output, ValueKind kind) {
465+
outputWithKind(mainOutputTag, output, kind);
466+
}
467+
468+
@Override
469+
public <T> void outputWithKind(TupleTag<T> tag, T value, ValueKind kind) {
470+
noteOutput();
471+
if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
472+
((TimestampObservingWatermarkEstimator) watermarkEstimator)
473+
.observeTimestamp(element.getTimestamp());
474+
}
475+
outputReceiver.output(
476+
tag,
477+
WindowedValues.of(
478+
value,
479+
element.getTimestamp(),
480+
element.getWindows(),
481+
element.getPaneInfo(),
482+
element.getRecordId(),
483+
element.getRecordOffset(),
484+
element.causedByDrain(),
485+
kind));
486+
}
487+
457488
@Override
458489
public <T> void outputWindowedValue(
459490
TupleTag<T> tag,

runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import org.apache.beam.sdk.values.PCollectionView;
6262
import org.apache.beam.sdk.values.Row;
6363
import org.apache.beam.sdk.values.TupleTag;
64+
import org.apache.beam.sdk.values.ValueKind;
6465
import org.apache.beam.sdk.values.WindowedValue;
6566
import org.apache.beam.sdk.values.WindowedValues;
6667
import org.apache.beam.sdk.values.WindowingStrategy;
@@ -415,6 +416,11 @@ public CausedByDrain causedByDrain() {
415416
return elem.causedByDrain();
416417
}
417418

419+
@Override
420+
public ValueKind valueKind() {
421+
return elem.getValueKind();
422+
}
423+
418424
@Override
419425
public <T> T sideInput(PCollectionView<T> view) {
420426
checkNotNull(view, "View passed to sideInput cannot be null");
@@ -438,6 +444,11 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) {
438444
outputWithTimestamp(mainOutputTag, output, timestamp);
439445
}
440446

447+
@Override
448+
public void outputWithKind(OutputT output, ValueKind kind) {
449+
outputWithKind(mainOutputTag, output, kind);
450+
}
451+
441452
@Override
442453
public void outputWindowedValue(
443454
OutputT output,
@@ -447,6 +458,15 @@ public void outputWindowedValue(
447458
outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo);
448459
}
449460

461+
public void outputWindowedValue(
462+
OutputT output,
463+
Instant timestamp,
464+
Collection<? extends BoundedWindow> windows,
465+
PaneInfo paneInfo,
466+
ValueKind valueKind) {
467+
outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo, valueKind);
468+
}
469+
450470
@Override
451471
public <T> void output(TupleTag<T> tag, T output) {
452472
checkNotNull(tag, "Tag passed to output cannot be null");
@@ -471,6 +491,20 @@ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp
471491
outputWindowedValue(tag, output, timestamp, elem.getWindows(), elem.getPaneInfo());
472492
}
473493

494+
@Override
495+
public <T> void outputWithKind(TupleTag<T> tag, T output, ValueKind kind) {
496+
checkNotNull(tag, "Tag passed to outputWithKind cannot be null");
497+
builderSupplier
498+
.builder(output)
499+
.setValueKind(kind)
500+
.setReceiver(
501+
wv -> {
502+
checkTimestamp(elem.getTimestamp(), wv.getTimestamp());
503+
SimpleDoFnRunner.this.outputWindowedValue(tag, wv);
504+
})
505+
.output();
506+
}
507+
474508
@Override
475509
public <T> void outputWindowedValue(
476510
TupleTag<T> tag,
@@ -491,6 +525,27 @@ public <T> void outputWindowedValue(
491525
.output();
492526
}
493527

528+
public <T> void outputWindowedValue(
529+
TupleTag<T> tag,
530+
T output,
531+
Instant timestamp,
532+
Collection<? extends BoundedWindow> windows,
533+
PaneInfo paneInfo,
534+
ValueKind valueKind) {
535+
builderSupplier
536+
.builder(output)
537+
.setTimestamp(timestamp)
538+
.setWindows(windows)
539+
.setPaneInfo(paneInfo)
540+
.setValueKind(valueKind)
541+
.setReceiver(
542+
wv -> {
543+
checkTimestamp(elem.getTimestamp(), wv.getTimestamp());
544+
SimpleDoFnRunner.this.outputWindowedValue(tag, wv);
545+
})
546+
.output();
547+
}
548+
494549
@Override
495550
public Instant timestamp() {
496551
return elem.getTimestamp();
@@ -581,6 +636,11 @@ public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
581636
return elem.causedByDrain();
582637
}
583638

639+
@Override
640+
public ValueKind valueKind(DoFn<InputT, OutputT> doFn) {
641+
return elem.getValueKind();
642+
}
643+
584644
@Override
585645
public String timerId(DoFn<InputT, OutputT> doFn) {
586646
throw new UnsupportedOperationException(
@@ -862,6 +922,11 @@ public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
862922
return causedByDrain;
863923
}
864924

925+
@Override
926+
public ValueKind valueKind(DoFn<InputT, OutputT> doFn) {
927+
throw new UnsupportedOperationException("ValueKind parameters are not supported.");
928+
}
929+
865930
@Override
866931
public String timerId(DoFn<InputT, OutputT> doFn) {
867932
return timerId;
@@ -1008,6 +1073,11 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) {
10081073
outputWithTimestamp(mainOutputTag, output, timestamp);
10091074
}
10101075

1076+
@Override
1077+
public void outputWithKind(OutputT output, ValueKind kind) {
1078+
outputWithKind(mainOutputTag, output, kind);
1079+
}
1080+
10111081
@Override
10121082
public void outputWindowedValue(
10131083
OutputT output,
@@ -1030,6 +1100,12 @@ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp
10301100
tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING);
10311101
}
10321102

1103+
@Override
1104+
public <T> void outputWithKind(TupleTag<T> tag, T output, ValueKind kind) {
1105+
outputWindowedValue(
1106+
tag, output, timestamp(), Collections.singleton(window()), PaneInfo.NO_FIRING, kind);
1107+
}
1108+
10331109
@Override
10341110
public <T> void outputWindowedValue(
10351111
TupleTag<T> tag,
@@ -1048,6 +1124,25 @@ public <T> void outputWindowedValue(
10481124
.output();
10491125
}
10501126

1127+
public <T> void outputWindowedValue(
1128+
TupleTag<T> tag,
1129+
T output,
1130+
Instant timestamp,
1131+
Collection<? extends BoundedWindow> windows,
1132+
PaneInfo paneInfo,
1133+
ValueKind kind) {
1134+
checkTimestamp(timestamp(), timestamp);
1135+
1136+
builderSupplier
1137+
.builder(output)
1138+
.setTimestamp(timestamp)
1139+
.setWindows(windows)
1140+
.setPaneInfo(paneInfo)
1141+
.setValueKind(kind)
1142+
.setReceiver(wv -> SimpleDoFnRunner.this.outputWindowedValue(tag, wv))
1143+
.output();
1144+
}
1145+
10511146
@Override
10521147
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
10531148
outputWindowedValue(mainOutputTag, windowedValue);
@@ -1177,6 +1272,11 @@ public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
11771272
"Cannot access time domain outside of @ProcessTimer method.");
11781273
}
11791274

1275+
@Override
1276+
public ValueKind valueKind(DoFn<InputT, OutputT> doFn) {
1277+
throw new UnsupportedOperationException("ValueKind parameters are not supported.");
1278+
}
1279+
11801280
@Override
11811281
public KeyT key() {
11821282
return key;
@@ -1279,6 +1379,11 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) {
12791379
outputWithTimestamp(mainOutputTag, output, timestamp);
12801380
}
12811381

1382+
@Override
1383+
public void outputWithKind(OutputT output, ValueKind kind) {
1384+
outputWithKind(mainOutputTag, output, kind);
1385+
}
1386+
12821387
@Override
12831388
public void outputWindowedValue(
12841389
OutputT output,
@@ -1301,6 +1406,17 @@ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp
13011406
tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING);
13021407
}
13031408

1409+
@Override
1410+
public <T> void outputWithKind(TupleTag<T> tag, T output, ValueKind kind) {
1411+
outputWindowedValue(
1412+
tag,
1413+
output,
1414+
timestamp(SimpleDoFnRunner.this.fn),
1415+
Collections.singleton(window()),
1416+
PaneInfo.NO_FIRING,
1417+
kind);
1418+
}
1419+
13041420
@Override
13051421
public <T> void outputWindowedValue(
13061422
TupleTag<T> tag,
@@ -1318,6 +1434,24 @@ public <T> void outputWindowedValue(
13181434
.output();
13191435
}
13201436

1437+
public <T> void outputWindowedValue(
1438+
TupleTag<T> tag,
1439+
T output,
1440+
Instant timestamp,
1441+
Collection<? extends BoundedWindow> windows,
1442+
PaneInfo paneInfo,
1443+
ValueKind kind) {
1444+
checkTimestamp(this.timestamp, timestamp);
1445+
builderSupplier
1446+
.builder(output)
1447+
.setTimestamp(timestamp)
1448+
.setWindows(windows)
1449+
.setPaneInfo(paneInfo)
1450+
.setValueKind(kind)
1451+
.setReceiver(wv -> SimpleDoFnRunner.this.outputWindowedValue(tag, wv))
1452+
.output();
1453+
}
1454+
13211455
@Override
13221456
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
13231457
outputWindowedValue(mainOutputTag, windowedValue);

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,13 +139,26 @@ protected WindowedValue<T> decodeMessage(Windmill.Message message) throws IOExce
139139
* drain happened upstream
140140
*/
141141
CausedByDrain drainingValueFromUpstream = CausedByDrain.NORMAL;
142+
ValueKind valueKind = ValueKind.INSERT;
142143
if (WindowedValues.WindowedValueCoder.isMetadataSupported()) {
143144
BeamFnApi.Elements.ElementMetadata elementMetadata =
144145
WindmillSink.decodeAdditionalMetadata(windowsCoder, message.getMetadata());
145146
drainingValueFromUpstream =
146147
elementMetadata.getDrain() == BeamFnApi.Elements.DrainMode.Enum.DRAINING
147148
? CausedByDrain.CAUSED_BY_DRAIN
148149
: CausedByDrain.NORMAL;
150+
if (elementMetadata.hasValueKind()) {
151+
valueKind =
152+
elementMetadata.getValueKind() == BeamFnApi.Elements.ValueKind.Enum.INSERT
153+
? ValueKind.INSERT
154+
: elementMetadata.getValueKind()
155+
== BeamFnApi.Elements.ValueKind.Enum.UPDATE_BEFORE
156+
? ValueKind.UPDATE_BEFORE
157+
: elementMetadata.getValueKind()
158+
== BeamFnApi.Elements.ValueKind.Enum.UPDATE_AFTER
159+
? ValueKind.UPDATE_AFTER
160+
: ValueKind.DELETE;
161+
}
149162
}
150163
if (valueCoder instanceof KvCoder) {
151164
KvCoder<?, ?> kvCoder = (KvCoder<?, ?>) valueCoder;
@@ -156,7 +169,14 @@ protected WindowedValue<T> decodeMessage(Windmill.Message message) throws IOExce
156169
T result =
157170
(T) KV.of(decode(kvCoder.getKeyCoder(), key), decode(kvCoder.getValueCoder(), data));
158171
return WindowedValues.of(
159-
result, timestampMillis, windows, paneInfo, null, null, drainingValueFromUpstream);
172+
result,
173+
timestampMillis,
174+
windows,
175+
paneInfo,
176+
null,
177+
null,
178+
drainingValueFromUpstream,
179+
valueKind);
160180
} else {
161181
notifyElementRead(data.available() + metadata.available());
162182
return WindowedValues.of(
@@ -166,7 +186,8 @@ protected WindowedValue<T> decodeMessage(Windmill.Message message) throws IOExce
166186
paneInfo,
167187
null,
168188
null,
169-
drainingValueFromUpstream);
189+
drainingValueFromUpstream,
190+
valueKind);
170191
}
171192
}
172193

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
4141
import org.apache.beam.sdk.util.ByteStringOutputStream;
4242
import org.apache.beam.sdk.values.KV;
43+
import org.apache.beam.sdk.values.ValueKindUtil;
4344
import org.apache.beam.sdk.values.ValueWithRecordId;
4445
import org.apache.beam.sdk.values.ValueWithRecordId.ValueWithRecordIdCoder;
4546
import org.apache.beam.sdk.values.WindowedValue;
@@ -220,7 +221,9 @@ public long add(WindowedValue<T> data) throws IOException {
220221
ByteString id = ByteString.EMPTY;
221222
// todo #33176 specify additional metadata in the future
222223
BeamFnApi.Elements.ElementMetadata additionalMetadata =
223-
BeamFnApi.Elements.ElementMetadata.newBuilder().build();
224+
BeamFnApi.Elements.ElementMetadata.newBuilder()
225+
.setValueKind(ValueKindUtil.toProto(data.getValueKind()))
226+
.build();
224227
ByteString metadata =
225228
encodeMetadata(
226229
stream, windowsCoder, data.getWindows(), data.getPaneInfo(), additionalMetadata);

0 commit comments

Comments
 (0)