Skip to content

Commit 4aff88e

Browse files
authored
Revert "[Dataflow Streaming] Prevent commit threads from sharing commit streams" (#37873)
1 parent 39c7b46 commit 4aff88e

1 file changed

Lines changed: 3 additions & 6 deletions

File tree

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -464,13 +464,10 @@ private StreamingWorkerHarnessFactoryOutput createSingleSourceWorkerHarness(
464464
@SuppressWarnings("methodref.receiver.bound")
465465
WorkCommitter workCommitter =
466466
StreamingEngineWorkCommitter.builder()
467-
// Use a separate stream pool for each committer. This ensures the commit
468-
// threads are fully isolated.
469467
.setCommitWorkStreamFactory(
470-
() ->
471-
WindmillStreamPool.create(
472-
1, COMMIT_STREAM_TIMEOUT, windmillServer::commitWorkStream)
473-
.getCloseableStream())
468+
WindmillStreamPool.create(
469+
numCommitThreads, COMMIT_STREAM_TIMEOUT, windmillServer::commitWorkStream)
470+
::getCloseableStream)
474471
.setCommitByteSemaphore(Commits.maxCommitByteSemaphore())
475472
.setNumCommitSenders(numCommitThreads)
476473
.setOnCommitComplete(this::onCompleteCommit)

0 commit comments

Comments
 (0)