Skip to content

feat(transactions): Transaction added to garantee only once paradigm.…#297

Open
marcosschroh wants to merge 1 commit into
masterfrom
feat/add-transantions
Open

feat(transactions): Transaction added to garantee only once paradigm.…#297
marcosschroh wants to merge 1 commit into
masterfrom
feat/add-transantions

Conversation

@marcosschroh

Copy link
Copy Markdown
Collaborator

… Related to #265

@github-actions

github-actions Bot commented Mar 6, 2025

Copy link
Copy Markdown
Contributor
PR Preview Action v1.6.0

🚀 View preview at
https://kpn.github.io/kstreams/pr-preview/pr-297/

Built to branch gh-pages at 2025-03-06 13:31 UTC.
Preview will be ready when the GitHub Pages deployment is complete.

@marcosschroh marcosschroh requested a review from woile March 6, 2025 13:36
Comment thread docs/transactions.md
@@ -0,0 +1,241 @@
`Kafka 0.11.0` includes support for `idempotent` and `transactional` capabilities in the `producer`. Idempotent delivery ensures that messages are delivered `exactly once`
to a particular topic partition during the lifetime of a `single producer`. Transactional delivery allows producers to send data to multiple partitions such that either
all messages are successfully delivered, or none of them are. Together, these capabilities enable `*exactly once semantics*` in Kafka.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think transactions lack some explanation here. I would add that "exactly once semantics" are for only one scenario:

  • consume-process-produce under the same kafka cluster.

Comment thread kstreams/types.py
) -> typing.Awaitable[RecordMetadata]: ...


class Transaction(typing.Protocol):

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a docstring to this class please. It's not clear why we have a protocol Transaction and then we have transaction.Transaction. An explanation on the docstring would clarify.

consider also renaming this to TransactionProtocol.

Comment thread kstreams/types.py
from .clients import Producer

if typing.TYPE_CHECKING:
from . import transaction # pragma: no cover

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a way to avoid this circular import? could transaction.Transaction also be a protocol?

Comment thread docs/transactions.md
- Transaction state is stored in a new internal topic `__transaction_state`. This topic is not created until the the first attempt to use a transactional request API. There are several settings to control the topic's configuration.
- `Topics` which are included in transactions should be configured for durability. In particular, the `replication.factor` should be at least `3`, and the `min.insync.replicas` for these topics should be set to `2`
- `Transactions` always add overhead, meaning that more effort is needed to produce events and the consumer have to apply filters
For example, `transaction.state.log.min.isr` controls the minimum ISR for this topic.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For discoverability I would add links to the kafka settings.

Suggested change
For example, `transaction.state.log.min.isr` controls the minimum ISR for this topic.
For example, [`transaction.state.log.min.isr`](https://kafka.apache.org/documentation/#brokerconfigs_transaction.state.log.min.isr) controls the minimum ISR for this topic.

Comment thread docs/transactions.md

## Usage

From the `kstreams` point of view, the `transaction pattern` is a `context manager` that will `start` a transaction and then `commit` or `aboort` it. Always a `transaction` starts when an event is send. Once that we have the `context` we can send events in two ways:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
From the `kstreams` point of view, the `transaction pattern` is a `context manager` that will `start` a transaction and then `commit` or `aboort` it. Always a `transaction` starts when an event is send. Once that we have the `context` we can send events in two ways:
From the `kstreams` point of view, the "transaction pattern" is a `context manager` that will `start` a transaction and then `commit` or `abort` it. A `transaction` always starts when an event is sent. Once we have the `context` we can send events in two ways:

Comment thread docs/transactions.md

From the `kstreams` point of view, the `transaction pattern` is a `context manager` that will `start` a transaction and then `commit` or `aboort` it. Always a `transaction` starts when an event is send. Once that we have the `context` we can send events in two ways:

Using the `StreamEngine` directly:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a side-note, the recommended way should be with the type annotation IMO.

@woile

woile commented Mar 10, 2025

Copy link
Copy Markdown
Member

Excellent work! 💪🏻

@marcosschroh

marcosschroh commented Mar 11, 2025

Copy link
Copy Markdown
Collaborator Author

I have hit an issue in aiokafka. Currently I am blocked by aio-libs/aiokafka#1098

@Masqueey Masqueey left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What an insane commit! Incredible that you can manage to implement such a large feature and manage to adapt the testing client to work and test your changes accordingly.

I left some spelling corrections and some questions, but mostly very impressed with the work you did.

Comment thread docs/transactions.md
@@ -0,0 +1,241 @@
`Kafka 0.11.0` includes support for `idempotent` and `transactional` capabilities in the `producer`. Idempotent delivery ensures that messages are delivered `exactly once`

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although we call it Kafka, formally it is called Apache Kafka. I think this first mention would be good to use the full name. Further references can stay Kafka.

Comment thread docs/transactions.md
It is important to notice that:

- `Transaction` always start from a `send` (producer)
- Events sent to one or more topics will only be visible on consumers after the transaction is committed.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Events or (offset) commits"

Comment thread docs/transactions.md
- To use transactions, a `transaction id` (unique id per transaction) must be set prior an event is sent. If you do not provide one `kstreams` will auto generate one for you.
- Transaction state is stored in a new internal topic `__transaction_state`. This topic is not created until the the first attempt to use a transactional request API. There are several settings to control the topic's configuration.
- `Topics` which are included in transactions should be configured for durability. In particular, the `replication.factor` should be at least `3`, and the `min.insync.replicas` for these topics should be set to `2`
- `Transactions` always add overhead, meaning that more effort is needed to produce events and the consumer have to apply filters

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not all sentences in this itemisation end in a period (.), I would change them to make sure all of them do. (Sorry for nitpicking.)

Comment thread docs/transactions.md
- `Topics` which are included in transactions should be configured for durability. In particular, the `replication.factor` should be at least `3`, and the `min.insync.replicas` for these topics should be set to `2`
- `Transactions` always add overhead, meaning that more effort is needed to produce events and the consumer have to apply filters
For example, `transaction.state.log.min.isr` controls the minimum ISR for this topic.
- `Streams` (consumers) will have to filter or not transactional events

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe more: "Consumer will have to set per Stream whether to filter out events based on their transaction header." (Or something like that, I might not understand the process well enough yet.)

Comment thread docs/transactions.md
queue "Topic A" as topic_a

producer -> topic_a: "produce with a transaction"
topic_a --> stream_a: "consume only transactional events"

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's more about the success of the transaction, more than whether or not it was part of a transaction, right? So more "filter out aborted transactional events" than "only transactional"?

Comment thread kstreams/engine.py
self._producer: typing.Optional[typing.Type[Producer]] = None
self._producer: typing.Optional[Producer] = None
self._streams: typing.List[Stream] = []
self._transaction_manager = TransactionManager(

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come this TransactionManager no longer needs to be imported?

@@ -74,7 +132,10 @@ def __init__(self, group_id: Optional[str] = None, **kwargs) -> None:

# Called to make sure that has all the kafka attributes like _coordinator

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"that it has"

@@ -74,7 +132,10 @@ def __init__(self, group_id: Optional[str] = None, **kwargs) -> None:

# Called to make sure that has all the kafka attributes like _coordinator
# so it will behave like an real Kafka Consumer

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"a" real Kafka

Comment thread tests/test_client.py


@pytest.mark.asyncio
async def test_producer_with_transaction_no_consumer(

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not entirely get what this test is supposed to test. Not sure what this scenario is representing.

await asyncio.wait_for(stream.start(), timeout=0.1)

await stream.stop()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only test I'm still missing here is the produce-then-commit transaction test. (Although I'm aware that that one doesn't work because of the bug in aiokafka.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants