Skip to content

Commit 48139dd

Browse files
authored
[IcebergIO] Support sort order on dynamic table creation (#38269)
* [IcebergIO] Support sort order on dynamic table creation * update changes and trigger IT * fix changes.md * address review feedback
1 parent 9e1c7a8 commit 48139dd

14 files changed

Lines changed: 406 additions & 11 deletions
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run.",
3-
"modification": 4
3+
"modification": 5
44
}

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
## I/Os
6666

6767
* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
68+
* IcebergIO: support declaring a table's sort order on dynamic table creation via the new `sort_fields` config ([#38269](https://github.com/apache/beam/issues/38269)).
6869

6970
## New Features / Improvements
7071

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,11 @@
9393
import org.apache.iceberg.PartitionKey;
9494
import org.apache.iceberg.PartitionSpec;
9595
import org.apache.iceberg.Snapshot;
96+
import org.apache.iceberg.SortOrder;
9697
import org.apache.iceberg.Table;
9798
import org.apache.iceberg.TableProperties;
9899
import org.apache.iceberg.avro.Avro;
100+
import org.apache.iceberg.catalog.Catalog;
99101
import org.apache.iceberg.catalog.TableIdentifier;
100102
import org.apache.iceberg.exceptions.AlreadyExistsException;
101103
import org.apache.iceberg.exceptions.NoSuchTableException;
@@ -145,19 +147,22 @@ public class AddFiles extends PTransform<PCollection<String>, PCollectionRowTupl
145147
private final int manifestFileSize;
146148
private final @Nullable String locationPrefix;
147149
private final @Nullable List<String> partitionFields;
150+
private final @Nullable List<String> sortFields;
148151
private final @Nullable Map<String, String> tableProps;
149152

150153
public AddFiles(
151154
IcebergCatalogConfig catalogConfig,
152155
String tableIdentifier,
153156
@Nullable String locationPrefix,
154157
@Nullable List<String> partitionFields,
158+
@Nullable List<String> sortFields,
155159
@Nullable Map<String, String> tableProps,
156160
@Nullable Integer manifestFileSize,
157161
@Nullable Duration intervalTrigger) {
158162
this.catalogConfig = catalogConfig;
159163
this.tableIdentifier = tableIdentifier;
160164
this.partitionFields = partitionFields;
165+
this.sortFields = sortFields;
161166
this.tableProps = tableProps;
162167
this.intervalTrigger = intervalTrigger;
163168
this.manifestFileSize =
@@ -193,6 +198,7 @@ public PCollectionRowTuple expand(PCollection<String> input) {
193198
tableIdentifier,
194199
locationPrefix,
195200
partitionFields,
201+
sortFields,
196202
tableProps))
197203
.withOutputTags(DATA_FILES, TupleTagList.of(ERRORS)));
198204
SchemaCoder<SerializableDataFile> sdfCoder;
@@ -267,6 +273,7 @@ static class ConvertToDataFile extends DoFn<String, SerializableDataFile> {
267273
public static final TupleTag<SerializableDataFile> DATA_FILES = new TupleTag<>();
268274
private final @Nullable String prefix;
269275
private final @Nullable List<String> partitionFields;
276+
private final @Nullable List<String> sortFields;
270277
private final @Nullable Map<String, String> tableProps;
271278
private transient @MonotonicNonNull ExecutorService executor;
272279
private transient @MonotonicNonNull LinkedList<Future<ProcessResult>> activeTasks;
@@ -281,11 +288,13 @@ public ConvertToDataFile(
281288
String identifier,
282289
@Nullable String prefix,
283290
@Nullable List<String> partitionFields,
291+
@Nullable List<String> sortFields,
284292
@Nullable Map<String, String> tableProps) {
285293
this.catalogConfig = catalogConfig;
286294
this.identifier = identifier;
287295
this.prefix = prefix;
288296
this.partitionFields = partitionFields;
297+
this.sortFields = sortFields;
289298
this.tableProps = tableProps;
290299
}
291300

@@ -522,12 +531,18 @@ private Table getOrCreateTable(String filePath, FileFormat format) throws IOExce
522531
try {
523532
org.apache.iceberg.Schema schema = getSchema(filePath, format);
524533
PartitionSpec spec = PartitionUtils.toPartitionSpec(partitionFields, schema);
534+
SortOrder sortOrder = SortOrderUtils.toSortOrder(sortFields, schema);
525535

526-
return tableProps == null
527-
? catalogConfig.catalog().createTable(TableIdentifier.parse(identifier), schema, spec)
528-
: catalogConfig
536+
Catalog.TableBuilder builder =
537+
catalogConfig
529538
.catalog()
530-
.createTable(TableIdentifier.parse(identifier), schema, spec, tableProps);
539+
.buildTable(tableId, schema)
540+
.withPartitionSpec(spec)
541+
.withSortOrder(sortOrder);
542+
if (tableProps != null) {
543+
builder.withProperties(tableProps);
544+
}
545+
return builder.create();
531546
} catch (AlreadyExistsException e2) { // if table already exists, just load it
532547
return catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier));
533548
}

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFilesSchemaTransformProvider.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,15 @@ public static Builder builder() {
106106
+ " please visit https://iceberg.apache.org/docs/latest/configuration/#table-properties.")
107107
public abstract @Nullable Map<String, String> getTableProperties();
108108

109+
@SchemaFieldDescription(
110+
"Fields used to set the table's sort order, applied when the table is created. "
111+
+ "Each entry has the form `<term> [asc|desc] [nulls first|nulls last]`, where `<term>` "
112+
+ "is a field name or one of the partition transforms (e.g. `bucket(col, 4)`, `day(ts)`). "
113+
+ "Direction defaults to ascending; null order defaults to nulls-first for ascending and "
114+
+ "nulls-last for descending.\n"
115+
+ "For more information on sort orders, please visit https://iceberg.apache.org/spec/#sort-orders.")
116+
public abstract @Nullable List<String> getSortFields();
117+
109118
@SchemaFieldDescription("This option specifies whether and where to output unwritable rows.")
110119
public abstract @Nullable ErrorHandling getErrorHandling();
111120

@@ -127,6 +136,8 @@ public abstract static class Builder {
127136

128137
public abstract Builder setTableProperties(Map<String, String> props);
129138

139+
public abstract Builder setSortFields(List<String> sortFields);
140+
130141
public abstract Builder setErrorHandling(ErrorHandling errorHandling);
131142

132143
public abstract Configuration build();
@@ -172,6 +183,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
172183
configuration.getTable(),
173184
configuration.getLocationPrefix(),
174185
configuration.getPartitionFields(),
186+
configuration.getSortFields(),
175187
configuration.getTableProperties(),
176188
configuration.getManifestFileSize(),
177189
frequency != null ? Duration.standardSeconds(frequency) : null));

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergTableCreateConfig.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
import java.util.List;
2222
import java.util.Map;
2323
import org.apache.beam.sdk.schemas.Schema;
24+
import org.apache.beam.sdk.schemas.annotations.SchemaIgnore;
2425
import org.apache.iceberg.PartitionSpec;
26+
import org.apache.iceberg.SortOrder;
2527
import org.checkerframework.checker.nullness.qual.Nullable;
2628
import org.checkerframework.dataflow.qual.Pure;
2729

@@ -41,6 +43,16 @@ public PartitionSpec getPartitionSpec() {
4143
@Pure
4244
public abstract @Nullable List<String> getPartitionFields();
4345

46+
/** Sort order to apply when the table is dynamically created. */
47+
@Pure
48+
@SchemaIgnore
49+
public SortOrder getSortOrder() {
50+
return SortOrderUtils.toSortOrder(getSortFields(), getSchema());
51+
}
52+
53+
@Pure
54+
public abstract @Nullable List<String> getSortFields();
55+
4456
@Pure
4557
public abstract @Nullable Map<String, String> getTableProperties();
4658

@@ -55,6 +67,8 @@ public abstract static class Builder {
5567

5668
public abstract Builder setPartitionFields(@Nullable List<String> partitionFields);
5769

70+
public abstract Builder setSortFields(@Nullable List<String> sortFields);
71+
5872
public abstract Builder setTableProperties(@Nullable Map<String, String> tableProperties);
5973

6074
@Pure

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,16 @@ public static Builder builder() {
134134
+ " please visit https://iceberg.apache.org/docs/latest/configuration/#table-properties.")
135135
public abstract @Nullable Map<String, String> getTableProperties();
136136

137+
@SchemaFieldDescription(
138+
"Fields used to set the table's sort order, applied when the table is created. "
139+
+ "Each entry has the form `<term> [asc|desc] [nulls first|nulls last]`, where `<term>` "
140+
+ "is a field name or one of the partition transforms (e.g. `bucket(col, 4)`, `day(ts)`). "
141+
+ "Direction defaults to ascending; null order defaults to nulls-first for ascending and "
142+
+ "nulls-last for descending. Note: this sets the table's declared sort order as metadata; "
143+
+ "it does not cause Beam to physically sort records before writing.\n"
144+
+ "For more information on sort orders, please visit https://iceberg.apache.org/spec/#sort-orders.")
145+
public abstract @Nullable List<String> getSortFields();
146+
137147
@AutoValue.Builder
138148
public abstract static class Builder {
139149
public abstract Builder setTable(String table);
@@ -158,6 +168,8 @@ public abstract static class Builder {
158168

159169
public abstract Builder setTableProperties(Map<String, String> tableProperties);
160170

171+
public abstract Builder setSortFields(List<String> sortFields);
172+
161173
public abstract Configuration build();
162174
}
163175

@@ -223,6 +235,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
223235
FileFormat.PARQUET.toString(),
224236
rows.getSchema(),
225237
configuration.getPartitionFields(),
238+
configuration.getSortFields(),
226239
configuration.getTableProperties(),
227240
configuration.getDrop(),
228241
configuration.getKeep(),

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,19 +34,22 @@ class PortableIcebergDestinations implements DynamicDestinations {
3434
private final String fileFormat;
3535

3636
private final @Nullable List<String> partitionFields;
37+
private final @Nullable List<String> sortFields;
3738
private final @Nullable Map<String, String> tableProperties;
3839

3940
public PortableIcebergDestinations(
4041
String destinationTemplate,
4142
String fileFormat,
4243
Schema inputSchema,
4344
@Nullable List<String> partitionFields,
45+
@Nullable List<String> sortFields,
4446
@Nullable Map<String, String> tableProperties,
4547
@Nullable List<String> fieldsToDrop,
4648
@Nullable List<String> fieldsToKeep,
4749
@Nullable String onlyField) {
4850
this.interpolator = new RowStringInterpolator(destinationTemplate, inputSchema);
4951
this.partitionFields = partitionFields;
52+
this.sortFields = sortFields;
5053
this.tableProperties = tableProperties;
5154
RowFilter rf = new RowFilter(inputSchema);
5255

@@ -86,6 +89,7 @@ public IcebergDestination instantiateDestination(String dest) {
8689
IcebergTableCreateConfig.builder()
8790
.setSchema(getDataSchema())
8891
.setPartitionFields(partitionFields)
92+
.setSortFields(sortFields)
8993
.setTableProperties(tableProperties)
9094
.build())
9195
.setFileFormat(FileFormat.fromString(fileFormat))

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.iceberg.PartitionField;
5050
import org.apache.iceberg.PartitionKey;
5151
import org.apache.iceberg.PartitionSpec;
52+
import org.apache.iceberg.SortOrder;
5253
import org.apache.iceberg.Table;
5354
import org.apache.iceberg.catalog.Catalog;
5455
import org.apache.iceberg.catalog.Namespace;
@@ -329,6 +330,7 @@ Table getOrCreateTable(IcebergDestination destination, Schema dataSchema) {
329330
@Nullable IcebergTableCreateConfig createConfig = destination.getTableCreateConfig();
330331
PartitionSpec partitionSpec =
331332
createConfig != null ? createConfig.getPartitionSpec() : PartitionSpec.unpartitioned();
333+
SortOrder sortOrder = createConfig != null ? createConfig.getSortOrder() : SortOrder.unsorted();
332334
Map<String, String> tableProperties =
333335
createConfig != null && createConfig.getTableProperties() != null
334336
? createConfig.getTableProperties()
@@ -357,13 +359,20 @@ Table getOrCreateTable(IcebergDestination destination, Schema dataSchema) {
357359
} catch (NoSuchTableException e) { // Otherwise, create the table
358360
org.apache.iceberg.Schema tableSchema = IcebergUtils.beamSchemaToIcebergSchema(dataSchema);
359361
try {
360-
table = catalog.createTable(identifier, tableSchema, partitionSpec, tableProperties);
362+
table =
363+
catalog
364+
.buildTable(identifier, tableSchema)
365+
.withPartitionSpec(partitionSpec)
366+
.withSortOrder(sortOrder)
367+
.withProperties(tableProperties)
368+
.create();
361369
LOG.info(
362370
"Created Iceberg table '{}' with schema: {}\n"
363-
+ ", partition spec: {}, table properties: {}",
371+
+ ", partition spec: {}, sort order: {}, table properties: {}",
364372
identifier,
365373
tableSchema,
366374
partitionSpec,
375+
sortOrder,
367376
tableProperties);
368377
} catch (AlreadyExistsException ignored) {
369378
// race condition: another worker already created this table

0 commit comments

Comments
 (0)