首页 > 文章列表 > 事件流与事件溯源

事件流与事件溯源

Kafka go 实践
369 2024-04-20

事件流与事件溯源

事件流和事件溯源是事件驱动架构中两个相关但不同的概念。

事件流是将系统中发生的事件持续捕捉和记录的过程。这些事件可以立即进行处理和分析,也可以被存储以备后续分析。事件流通常应用于需要处理大量实时数据的系统,如金融交易或社交媒体平台。

以下是使用流行的Kafka消息系统在Go中进行事件流处理的简单示例:

package main

import (
"context"
"fmt"
"github.com/segmentio/kafka-go"
)

func main() {
// 设置Kafka生产者以将事件发送到主题
writer := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "my-topic",
})

// 发送一些事件到主题
writer.WriteMessages(context.Background(),
kafka.Message{
Key: []byte("key1"),
Value: []byte("value1"),
},
kafka.Message{
Key: []byte("key2"),
Value: []byte("value2"),
},
)

// 设置Kafka消费者以从主题读取事件
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "my-topic",
})

// 从主题读取事件
for {
msg, err := reader.ReadMessage(context.Background())
if err != nil {
break
}
fmt.Printf("Received message: key=%s, value=%sn", string(msg.Key), string(msg.Value))
}
}

而事件溯源是一种构建系统的模式,将应用程序状态的所有变化存储为事件序列。这些事件然后可以用于在任何时间点重建应用程序的状态。事件溯源通常用于需要可审计性、可追溯性或合规性的系统,如金融系统或医疗系统。

以下是在Go中使用内存事件存储进行事件溯源的简单示例:

package main

import (
"fmt"
)

type Event struct {
Type string
Data interface{}
}

type EventStore struct {
events []Event
}

func (store *EventStore) Append(event Event) {
store.events = append(store.events, event)
}

func (store *EventStore) GetEvents() []Event {
return store.events
}

type Account struct {
idstring
balance int
store *EventStore
}

func NewAccount(id string, store *EventStore) *Account {
return &Account{
id:id,
balance: 0,
store: store,
}
}

func (account *Account) Deposit(amount int) {
event := Event{
Type: "deposit",
Data: amount,
}
account.store.Append(event)
account.balance += amount
}

func (account *Account) Withdraw(amount int) {
if account.balance >= amount {
event := Event{
Type: "withdraw",
Data: amount,
}
account.store.Append(event)
account.balance -= amount
}
}

func (account *Account) GetBalance() int {
return account.balance
}

func main() {
store := &EventStore{}
account := NewAccount("123", store)

account.Deposit(100)
account.Withdraw(50)
account.Deposit(25)

events := store.GetEvents()
for _, event := range events {
switch event.Type {
case "deposit":
amount := event.Data.(int)
fmt.Printf("Deposited %dn", amount)
case "withdraw":
amount := event.Data.(int)
fmt.Printf("Withdrew %dn", amount)
}
}

fmt.Printf("Final balance: %dn", account.GetBalance())
}

事件溯源是通过将每个对聚合的修改记录为事件并将其追加到连续流中的一种方法。要重建聚合的最终状态,需要按顺序读取这些事件,然后将其应用于聚合。这与在创建、读取、更新和删除(CRUD)系统中执行的即时修改形成对比。在CRUD系统中,对记录状态的任何更改都存储在数据库中,实质上覆盖了同

一聚合的先前版本。

价格变化已保存到Products表中后,只有价格会被更新,而其他部分将保持不变。然而,这种方法可能导致丢失先前价格和变更背后的上下文,如图5.1所示。

为了保留包括新价格和关键元数据(如调整原因)在内的信息,更改记录将被存储在Events表中作为事件。先前的价格将保持不变,以确保在必要时可以进行检索。

为了实现有效的事件溯源,建议使用提供强大一致性保证并使用乐观并发控制的事件存储。在实践中,这意味着当多个修改同时发生时,只有初始修改才能附加到流中。随后的修改可能需要重试或可能会失败。