Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,26 @@ message Elements {
}
}

// The type of change operation represented by a Change Data Capture (CDC) record
message ValueKind {
enum Enum {
// Indicates a new record was created in the source system.
INSERT = 0;

// Indicates the state of a record immediately <b>before</b> an update occurred.
// This is typically used to identify the previous values of modified columns
// or to locate the record via its primary key.
UPDATE_BEFORE = 1;

// Indicates the state of a record immediately after an update occurred.
// Represents the current, valid state of the record following the change.
UPDATE_AFTER = 2;

// Indicates that an existing record was removed from the source system.
DELETE = 3;
}
}

// Element metadata passed as part of WindowedValue to make WindowedValue
// extensible and backward compatible
message ElementMetadata {
Expand All @@ -770,6 +790,9 @@ message Elements {
// across IOs - Kafka, PubSub, http.
// Example value: congo=t61rcWkgMzE
optional string tracestate = 3;
// (Optional) The kind of value for CDC metadata.
// If missing or unspecified, implies INSERT for backwards compatibility.
optional ValueKind.Enum value_kind = 4;
}

// Represent the encoded user timer for a given instruction, transform and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.ValueKind;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
Expand Down Expand Up @@ -202,6 +203,11 @@ public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
return processContext.causedByDrain();
}

@Override
public ValueKind valueKind(DoFn<InputT, OutputT> doFn) {
return processContext.valueKind();
}

@Override
public RestrictionTracker<?, ?> restrictionTracker() {
return processContext.tracker;
Expand Down Expand Up @@ -407,6 +413,11 @@ public CausedByDrain causedByDrain() {
return element.causedByDrain();
}

@Override
public ValueKind valueKind() {
return element.getValueKind();
}

@Override
public PipelineOptions getPipelineOptions() {
return pipelineOptions;
Expand Down Expand Up @@ -442,6 +453,7 @@ public <T> void output(TupleTag<T> tag, T value) {

@Override
public <T> void outputWithTimestamp(TupleTag<T> tag, T value, Instant timestamp) {
noteOutput();
outputReceiver.output(
tag,
WindowedValues.of(
Expand All @@ -454,6 +466,31 @@ public <T> void outputWithTimestamp(TupleTag<T> tag, T value, Instant timestamp)
element.causedByDrain()));
}

@Override
public void outputWithKind(OutputT output, ValueKind kind) {
outputWithKind(mainOutputTag, output, kind);
}

@Override
public <T> void outputWithKind(TupleTag<T> tag, T value, ValueKind kind) {
noteOutput();
if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
((TimestampObservingWatermarkEstimator) watermarkEstimator)
.observeTimestamp(element.getTimestamp());
}
outputReceiver.output(
tag,
WindowedValues.of(
value,
element.getTimestamp(),
element.getWindows(),
element.getPaneInfo(),
element.getRecordId(),
element.getRecordOffset(),
element.causedByDrain(),
kind));
}

@Override
public <T> void outputWindowedValue(
TupleTag<T> tag,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.ValueKind;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.sdk.values.WindowingStrategy;
Expand Down Expand Up @@ -438,6 +439,11 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) {
outputWithTimestamp(mainOutputTag, output, timestamp);
}

@Override
public void outputWithKind(OutputT output, ValueKind kind) {
outputWithKind(mainOutputTag, output, kind);
}

@Override
public void outputWindowedValue(
OutputT output,
Expand Down Expand Up @@ -471,6 +477,22 @@ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp
outputWindowedValue(tag, output, timestamp, elem.getWindows(), elem.getPaneInfo());
}

@Override
public <T> void outputWithKind(TupleTag<T> tag, T output, ValueKind kind) {
builderSupplier
.builder(output)
.setTimestamp(elem.getTimestamp())
.setWindows(elem.getWindows())
.setPaneInfo(elem.getPaneInfo())
.setValueKind(kind)
.setReceiver(
wv -> {
checkTimestamp(elem.getTimestamp(), wv.getTimestamp());
SimpleDoFnRunner.this.outputWindowedValue(tag, wv);
})
.output();
}

@Override
public <T> void outputWindowedValue(
TupleTag<T> tag,
Expand Down Expand Up @@ -506,6 +528,11 @@ public Instant timestamp() {
return elem.getRecordOffset();
}

@Override
public ValueKind valueKind() {
return elem.getValueKind();
}

public Collection<? extends BoundedWindow> windows() {
return elem.getWindows();
}
Expand Down Expand Up @@ -581,6 +608,11 @@ public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
return elem.causedByDrain();
}

@Override
public ValueKind valueKind(DoFn<InputT, OutputT> doFn) {
return elem.getValueKind();
}

@Override
public String timerId(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -862,6 +894,11 @@ public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
return causedByDrain;
}

@Override
public ValueKind valueKind(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException("ValueKind parameters are not supported.");
}

@Override
public String timerId(DoFn<InputT, OutputT> doFn) {
return timerId;
Expand Down Expand Up @@ -1008,6 +1045,11 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) {
outputWithTimestamp(mainOutputTag, output, timestamp);
}

@Override
public void outputWithKind(OutputT output, ValueKind kind) {
outputWithKind(mainOutputTag, output, kind);
}

@Override
public void outputWindowedValue(
OutputT output,
Expand All @@ -1030,6 +1072,19 @@ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp
tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING);
}

@Override
public <T> void outputWithKind(TupleTag<T> tag, T output, ValueKind kind) {
checkTimestamp(timestamp(), timestamp);
builderSupplier
.builder(output)
.setTimestamp(timestamp)
.setWindows(Collections.singleton(window()))
.setPaneInfo(PaneInfo.NO_FIRING)
.setValueKind(kind)
.setReceiver(wv -> SimpleDoFnRunner.this.outputWindowedValue(tag, wv))
.output();
}

@Override
public <T> void outputWindowedValue(
TupleTag<T> tag,
Expand Down Expand Up @@ -1177,6 +1232,11 @@ public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
"Cannot access time domain outside of @ProcessTimer method.");
}

@Override
public ValueKind valueKind(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException("ValueKind parameters are not supported.");
}

@Override
public KeyT key() {
return key;
Expand Down Expand Up @@ -1279,6 +1339,11 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) {
outputWithTimestamp(mainOutputTag, output, timestamp);
}

@Override
public void outputWithKind(OutputT output, ValueKind kind) {
outputWithKind(mainOutputTag, output, kind);
}

@Override
public void outputWindowedValue(
OutputT output,
Expand All @@ -1301,6 +1366,18 @@ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp
tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING);
}

@Override
public <T> void outputWithKind(TupleTag<T> tag, T output, ValueKind kind) {
checkTimestamp(this.timestamp, timestamp);
builderSupplier
.builder(output)
.setTimestamp(timestamp)
.setWindows(Collections.singleton(window()))
.setPaneInfo(PaneInfo.NO_FIRING)
.setReceiver(wv -> SimpleDoFnRunner.this.outputWindowedValue(tag, wv))
.output();
}

@Override
public <T> void outputWindowedValue(
TupleTag<T> tag,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.beam.sdk.util.construction.PTransformTranslation;
import org.apache.beam.sdk.util.construction.SplittableParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -176,6 +177,7 @@ public DirectPipelineResult run(Pipeline pipeline) {

performRewrites(pipeline);
MetricsEnvironment.setMetricsSupported(true);
WindowedValues.WindowedValueCoder.setMetadataSupported(true);
try {
DirectGraphVisitor graphVisitor = new DirectGraphVisitor();
pipeline.traverseTopologically(graphVisitor);
Expand Down Expand Up @@ -233,6 +235,7 @@ public DirectPipelineResult run(Pipeline pipeline) {
return result;
} finally {
MetricsEnvironment.setMetricsSupported(false);
WindowedValues.WindowedValueCoder.setMetadataSupported(false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.ValueKind;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.sdk.values.WindowedValues.FullWindowedValueCoder;
Expand Down Expand Up @@ -1379,6 +1380,11 @@ public T getValue() {
return value;
}

@Override
public ValueKind getValueKind() {
return ValueKind.INSERT;
}

@Override
public CausedByDrain causedByDrain() {
return CausedByDrain.NORMAL;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ public void processElement(
.setTimestamp(kv.getValue().getTimestamp())
.setWindow(kv.getValue().getWindow())
.setPaneInfo(kv.getValue().getPaneInfo())
.setValueKind(kv.getValue().getValueKind())
.output();
}
}));
Expand Down
Loading
Loading