Skip to content

Commit 2857603

Browse files
authored
Add outstanding parameters to processElement and onTimer signatures (#37933)
* add outstanding parameters to processElement and onTimer signatures
1 parent a61ef0a commit 2857603

9 files changed

Lines changed: 376 additions & 4 deletions

File tree

runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -576,6 +576,22 @@ public Instant timestamp(DoFn<InputT, OutputT> doFn) {
576576
return timestamp();
577577
}
578578

579+
@Override
580+
public @Nullable String currentRecordId(DoFn<InputT, OutputT> doFn) {
581+
return currentRecordId();
582+
}
583+
584+
@Override
585+
public @Nullable Long currentRecordOffset(DoFn<InputT, OutputT> doFn) {
586+
return currentRecordOffset();
587+
}
588+
589+
@Override
590+
public Instant fireTimestamp(DoFn<InputT, OutputT> doFn) {
591+
throw new UnsupportedOperationException(
592+
"Cannot access fire timestamp outside of @OnTimer method.");
593+
}
594+
579595
@Override
580596
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
581597
return elem.causedByDrain();
@@ -857,6 +873,23 @@ public Instant timestamp(DoFn<InputT, OutputT> doFn) {
857873
return timestamp();
858874
}
859875

876+
@Override
877+
public @Nullable String currentRecordId(DoFn<InputT, OutputT> doFn) {
878+
throw new UnsupportedOperationException(
879+
"Cannot access record id outside of @ProcessElement method.");
880+
}
881+
882+
@Override
883+
public @Nullable Long currentRecordOffset(DoFn<InputT, OutputT> doFn) {
884+
throw new UnsupportedOperationException(
885+
"Cannot access record offset outside of @ProcessElement method.");
886+
}
887+
888+
@Override
889+
public Instant fireTimestamp(DoFn<InputT, OutputT> doFn) {
890+
return fireTimestamp();
891+
}
892+
860893
@Override
861894
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
862895
return causedByDrain;
@@ -1166,6 +1199,24 @@ public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
11661199
throw new UnsupportedOperationException("CausedByDrain parameters are not supported.");
11671200
}
11681201

1202+
@Override
1203+
public @Nullable String currentRecordId(DoFn<InputT, OutputT> doFn) {
1204+
throw new UnsupportedOperationException(
1205+
"Cannot access record id outside of @ProcessElement method.");
1206+
}
1207+
1208+
@Override
1209+
public @Nullable Long currentRecordOffset(DoFn<InputT, OutputT> doFn) {
1210+
throw new UnsupportedOperationException(
1211+
"Cannot access record offset outside of @ProcessElement method.");
1212+
}
1213+
1214+
@Override
1215+
public Instant fireTimestamp(DoFn<InputT, OutputT> doFn) {
1216+
throw new UnsupportedOperationException(
1217+
"Cannot access fire timestamp outside of @OnTimer method.");
1218+
}
1219+
11691220
@Override
11701221
public String timerId(DoFn<InputT, OutputT> doFn) {
11711222
throw new UnsupportedOperationException("Timer parameters are not supported.");

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -595,8 +595,10 @@ public interface MultiOutputReceiver {
595595
*
596596
* <p>The method annotated with {@code @OnTimer} may have parameters according to the same logic
597597
* as {@link ProcessElement}, but limited to the {@link BoundedWindow}, {@link State} subclasses,
598-
* and {@link Timer}. State and timer parameters must be annotated with their {@link StateId} and
599-
* {@link TimerId} respectively.
598+
* {@link Timer}, {@link FireTimestamp}, {@link Timestamp}, {@link Key}, {@link TimeDomain},
599+
* {@link PipelineOptions}, {@link OutputReceiver}, {@link MultiOutputReceiver}, and {@link
600+
* org.apache.beam.sdk.values.CausedByDrain CausedByDrain}. State and timer parameters must be
601+
* annotated with their {@link StateId} and {@link TimerId} respectively.
600602
*/
601603
@Documented
602604
@Retention(RetentionPolicy.RUNTIME)
@@ -613,8 +615,11 @@ public interface MultiOutputReceiver {
613615
*
614616
* <p>The method annotated with {@code @OnTimerFamily} may have parameters according to the same
615617
* logic as {@link ProcessElement}, but limited to the {@link BoundedWindow}, {@link State}
616-
* subclasses, and {@link org.apache.beam.sdk.state.TimerMap}. State and timer parameters must be
617-
* annotated with their {@link StateId} and {@link TimerId} respectively.
618+
* subclasses, {@link org.apache.beam.sdk.state.TimerMap}, {@link FireTimestamp}, {@link
619+
* Timestamp}, {@link Key}, {@link TimeDomain}, {@link PipelineOptions}, {@link OutputReceiver},
620+
* {@link MultiOutputReceiver}, and {@link org.apache.beam.sdk.values.CausedByDrain
621+
* CausedByDrain}. State and timer parameters must be annotated with their {@link StateId} and
622+
* {@link TimerId} respectively.
618623
*/
619624
@Documented
620625
@Retention(RetentionPolicy.RUNTIME)
@@ -705,6 +710,12 @@ public interface MultiOutputReceiver {
705710
* <li>If one of its arguments is tagged with the {@link Element} annotation, then it will be
706711
* passed the current element being processed. The argument type must match the input type
707712
* of this DoFn exactly, or both types must have equivalent schemas registered.
713+
* <li>If one of its arguments is tagged with the {@link CurrentRecordId} annotation, then it
714+
* will be passed the record id of the current element being processed; the argument must be
715+
* of type {@link String}.
716+
* <li>If one of its arguments is tagged with the {@link CurrentRecordOffset} annotation, then
717+
* it will be passed the record offset of the current element being processed; the argument
718+
* must be of type {@link Long}.
708719
* <li>If one of its arguments is tagged with the {@link Timestamp} annotation, then it will be
709720
* passed the timestamp of the current element being processed; the argument must be of type
710721
* {@link Instant}.
@@ -795,6 +806,12 @@ public interface MultiOutputReceiver {
795806
* <li>If one of its arguments is tagged with the {@link Timestamp} annotation, then it will be
796807
* passed the timestamp of the current element being processed; the argument must be of type
797808
* {@link Instant}.
809+
* <li>If one of its arguments is tagged with the {@link CurrentRecordId} annotation, then it
810+
* will be passed the record ID of the current element being processed; the argument must be
811+
* of type {@link String}.
812+
* <li>If one of its arguments is tagged with the {@link CurrentRecordOffset} annotation, then
813+
* it will be passed the record offset of the current element being processed; the argument
814+
* must be of type {@link Long}.
798815
* <li>If one of its arguments is of the type {@link WatermarkEstimator}, then it will be passed
799816
* the watermark estimator.
800817
* <li>If one of its arguments is of the type {@link ManualWatermarkEstimator}, then it will be
@@ -838,6 +855,18 @@ public interface MultiOutputReceiver {
838855
@Target(ElementType.PARAMETER)
839856
public @interface Element {}
840857

858+
/** Parameter annotation for the input element record id for {@link ProcessElement}. */
859+
@Documented
860+
@Retention(RetentionPolicy.RUNTIME)
861+
@Target(ElementType.PARAMETER)
862+
public @interface CurrentRecordId {}
863+
864+
/** Parameter annotation for the input element record offset for {@link ProcessElement}. */
865+
@Documented
866+
@Retention(RetentionPolicy.RUNTIME)
867+
@Target(ElementType.PARAMETER)
868+
public @interface CurrentRecordOffset {}
869+
841870
/**
842871
* Parameter annotation for the restriction for {@link GetSize}, {@link SplitRestriction}, {@link
843872
* GetInitialWatermarkEstimatorState}, {@link NewWatermarkEstimator}, and {@link NewTracker}
@@ -860,6 +889,12 @@ public interface MultiOutputReceiver {
860889
@Target(ElementType.PARAMETER)
861890
public @interface Timestamp {}
862891

892+
/** Parameter annotation for the firing timestamp for {@link OnTimer}. */
893+
@Documented
894+
@Retention(RetentionPolicy.RUNTIME)
895+
@Target(ElementType.PARAMETER)
896+
public @interface FireTimestamp {}
897+
863898
/** Parameter annotation for the SideInput for a {@link ProcessElement} method. */
864899
@Documented
865900
@Retention(RetentionPolicy.RUNTIME)

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,11 @@
7777
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.BundleFinalizerParameter;
7878
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Cases;
7979
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.CausedByDrainParameter;
80+
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.CurrentRecordIdParameter;
81+
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.CurrentRecordOffsetParameter;
8082
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.ElementParameter;
8183
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.FinishBundleContextParameter;
84+
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.FireTimestampParameter;
8285
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.OnTimerContextParameter;
8386
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.OutputReceiverParameter;
8487
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.PaneInfoParameter;
@@ -128,6 +131,9 @@ class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
128131
public static final String SCHEMA_ELEMENT_PARAMETER_METHOD = "schemaElement";
129132
public static final String TIMESTAMP_PARAMETER_METHOD = "timestamp";
130133
public static final String CAUSED_BY_DRAIN_PARAMETER_METHOD = "causedByDrain";
134+
public static final String CURRENT_RECORD_ID_PARAMETER_METHOD = "currentRecordId";
135+
public static final String CURRENT_RECORD_OFFSET_PARAMETER_METHOD = "currentRecordOffset";
136+
public static final String FIRE_TIMESTAMP_PARAMETER_METHOD = "fireTimestamp";
131137
public static final String BUNDLE_FINALIZER_PARAMETER_METHOD = "bundleFinalizer";
132138
public static final String OUTPUT_ROW_RECEIVER_METHOD = "outputRowReceiver";
133139
public static final String TIME_DOMAIN_PARAMETER_METHOD = "timeDomain";
@@ -1265,6 +1271,33 @@ public StackManipulation dispatch(DoFnSignature.Parameter.TimerIdParameter p) {
12651271
TIMER_ID_PARAMETER_METHOD, DoFn.class)));
12661272
}
12671273

1274+
@Override
1275+
public StackManipulation dispatch(CurrentRecordIdParameter p) {
1276+
return new StackManipulation.Compound(
1277+
pushDelegate,
1278+
MethodInvocation.invoke(
1279+
getExtraContextFactoryMethodDescription(
1280+
CURRENT_RECORD_ID_PARAMETER_METHOD, DoFn.class)));
1281+
}
1282+
1283+
@Override
1284+
public StackManipulation dispatch(CurrentRecordOffsetParameter p) {
1285+
return new StackManipulation.Compound(
1286+
pushDelegate,
1287+
MethodInvocation.invoke(
1288+
getExtraContextFactoryMethodDescription(
1289+
CURRENT_RECORD_OFFSET_PARAMETER_METHOD, DoFn.class)));
1290+
}
1291+
1292+
@Override
1293+
public StackManipulation dispatch(FireTimestampParameter p) {
1294+
return new StackManipulation.Compound(
1295+
pushDelegate,
1296+
MethodInvocation.invoke(
1297+
getExtraContextFactoryMethodDescription(
1298+
FIRE_TIMESTAMP_PARAMETER_METHOD, DoFn.class)));
1299+
}
1300+
12681301
@Override
12691302
public StackManipulation dispatch(DoFnSignature.Parameter.KeyParameter p) {
12701303
return new StackManipulation.Compound(

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,17 @@ interface ArgumentProvider<InputT, OutputT> {
218218
/** Provide a reference to the input element timestamp. */
219219
Instant timestamp(DoFn<InputT, OutputT> doFn);
220220

221+
/** Provide a reference to the record id of the current element. */
222+
@Nullable
223+
String currentRecordId(DoFn<InputT, OutputT> doFn);
224+
225+
/** Provide a reference to the record offset of the current element. */
226+
@Nullable
227+
Long currentRecordOffset(DoFn<InputT, OutputT> doFn);
228+
229+
/** Provide a reference to the firing timestamp of the current timer. */
230+
Instant fireTimestamp(DoFn<InputT, OutputT> doFn);
231+
221232
/** Provide a reference to the caused by drain. */
222233
CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn);
223234

@@ -329,6 +340,24 @@ public Instant timestamp(DoFn<InputT, OutputT> doFn) {
329340
String.format("Timestamp unsupported in %s", getErrorContext()));
330341
}
331342

343+
@Override
344+
public @Nullable String currentRecordId(DoFn<InputT, OutputT> doFn) {
345+
throw new UnsupportedOperationException(
346+
String.format("RecordId unsupported in %s", getErrorContext()));
347+
}
348+
349+
@Override
350+
public @Nullable Long currentRecordOffset(DoFn<InputT, OutputT> doFn) {
351+
throw new UnsupportedOperationException(
352+
String.format("RecordOffset unsupported in %s", getErrorContext()));
353+
}
354+
355+
@Override
356+
public Instant fireTimestamp(DoFn<InputT, OutputT> doFn) {
357+
throw new UnsupportedOperationException(
358+
String.format("FireTimestamp unsupported in %s", getErrorContext()));
359+
}
360+
332361
@Override
333362
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
334363
throw new UnsupportedOperationException(
@@ -524,6 +553,21 @@ public Instant timestamp(DoFn<InputT, OutputT> doFn) {
524553
return delegate.timestamp(doFn);
525554
}
526555

556+
@Override
557+
public @Nullable String currentRecordId(DoFn<InputT, OutputT> doFn) {
558+
return delegate.currentRecordId(doFn);
559+
}
560+
561+
@Override
562+
public @Nullable Long currentRecordOffset(DoFn<InputT, OutputT> doFn) {
563+
return delegate.currentRecordOffset(doFn);
564+
}
565+
566+
@Override
567+
public Instant fireTimestamp(DoFn<InputT, OutputT> doFn) {
568+
return delegate.fireTimestamp(doFn);
569+
}
570+
527571
@Override
528572
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
529573
return delegate.causedByDrain(doFn);

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,12 @@ public <ResultT> ResultT match(Cases<ResultT> cases) {
345345
return cases.dispatch((BundleFinalizerParameter) this);
346346
} else if (this instanceof CausedByDrainParameter) {
347347
return cases.dispatch((CausedByDrainParameter) this);
348+
} else if (this instanceof CurrentRecordIdParameter) {
349+
return cases.dispatch((CurrentRecordIdParameter) this);
350+
} else if (this instanceof CurrentRecordOffsetParameter) {
351+
return cases.dispatch((CurrentRecordOffsetParameter) this);
352+
} else if (this instanceof FireTimestampParameter) {
353+
return cases.dispatch((FireTimestampParameter) this);
348354
} else if (this instanceof KeyParameter) {
349355
return cases.dispatch((KeyParameter) this);
350356
} else {
@@ -375,6 +381,12 @@ public interface Cases<ResultT> {
375381

376382
ResultT dispatch(TaggedOutputReceiverParameter p);
377383

384+
ResultT dispatch(CurrentRecordIdParameter p);
385+
386+
ResultT dispatch(CurrentRecordOffsetParameter p);
387+
388+
ResultT dispatch(FireTimestampParameter p);
389+
378390
ResultT dispatch(OnTimerContextParameter p);
379391

380392
ResultT dispatch(WindowParameter p);
@@ -462,6 +474,21 @@ public ResultT dispatch(TimeDomainParameter p) {
462474
return dispatchDefault(p);
463475
}
464476

477+
@Override
478+
public ResultT dispatch(CurrentRecordIdParameter p) {
479+
return dispatchDefault(p);
480+
}
481+
482+
@Override
483+
public ResultT dispatch(CurrentRecordOffsetParameter p) {
484+
return dispatchDefault(p);
485+
}
486+
487+
@Override
488+
public ResultT dispatch(FireTimestampParameter p) {
489+
return dispatchDefault(p);
490+
}
491+
465492
@Override
466493
public ResultT dispatch(OnTimerContextParameter p) {
467494
return dispatchDefault(p);
@@ -566,6 +593,12 @@ public ResultT dispatch(KeyParameter p) {
566593
new AutoValue_DoFnSignature_Parameter_CausedByDrainParameter();
567594
private static final OnWindowExpirationContextParameter ON_WINDOW_EXPIRATION_CONTEXT_PARAMETER =
568595
new AutoValue_DoFnSignature_Parameter_OnWindowExpirationContextParameter();
596+
private static final CurrentRecordIdParameter CURRENT_RECORD_ID_PARAMETER =
597+
new AutoValue_DoFnSignature_Parameter_CurrentRecordIdParameter();
598+
private static final CurrentRecordOffsetParameter CURRENT_RECORD_OFFSET_PARAMETER =
599+
new AutoValue_DoFnSignature_Parameter_CurrentRecordOffsetParameter();
600+
private static final FireTimestampParameter FIRE_TIMESTAMP_PARAMETER =
601+
new AutoValue_DoFnSignature_Parameter_FireTimestampParameter();
569602

570603
/** Returns a {@link ProcessContextParameter}. */
571604
public static ProcessContextParameter processContext() {
@@ -592,6 +625,21 @@ public static CausedByDrainParameter causedByDrainParameter() {
592625
return CAUSED_BY_DRAIN_PARAMETER;
593626
}
594627

628+
/** Returns a {@link CurrentRecordIdParameter}. */
629+
public static CurrentRecordIdParameter currentRecordIdParameter() {
630+
return CURRENT_RECORD_ID_PARAMETER;
631+
}
632+
633+
/** Returns a {@link CurrentRecordOffsetParameter}. */
634+
public static CurrentRecordOffsetParameter currentRecordOffsetParameter() {
635+
return CURRENT_RECORD_OFFSET_PARAMETER;
636+
}
637+
638+
/** Returns a {@link FireTimestampParameter}. */
639+
public static FireTimestampParameter fireTimestampParameter() {
640+
return FIRE_TIMESTAMP_PARAMETER;
641+
}
642+
595643
public static ElementParameter elementParameter(TypeDescriptor<?> elementT) {
596644
return new AutoValue_DoFnSignature_Parameter_ElementParameter(elementT);
597645
}
@@ -754,6 +802,36 @@ public abstract static class CausedByDrainParameter extends Parameter {
754802
CausedByDrainParameter() {}
755803
}
756804

805+
/**
806+
* Descriptor for a {@link Parameter} of type {@link DoFn.RecordId}.
807+
*
808+
* <p>All such descriptors are equal.
809+
*/
810+
@AutoValue
811+
public abstract static class CurrentRecordIdParameter extends Parameter {
812+
CurrentRecordIdParameter() {}
813+
}
814+
815+
/**
816+
* Descriptor for a {@link Parameter} of type {@link DoFn.RecordOffset}.
817+
*
818+
* <p>All such descriptors are equal.
819+
*/
820+
@AutoValue
821+
public abstract static class CurrentRecordOffsetParameter extends Parameter {
822+
CurrentRecordOffsetParameter() {}
823+
}
824+
825+
/**
826+
* Descriptor for a {@link Parameter} of type {@link DoFn.FireTimestamp}.
827+
*
828+
* <p>All such descriptors are equal.
829+
*/
830+
@AutoValue
831+
public abstract static class FireTimestampParameter extends Parameter {
832+
FireTimestampParameter() {}
833+
}
834+
757835
/**
758836
* Descriptor for a {@link Parameter} of type {@link DoFn.Element}.
759837
*

0 commit comments

Comments
 (0)