이벤트 소싱 event-sourcing 패턴 JavaScript로 구현하기

😢 이 페이지는 다음 주소로 변경될 예정입니다.

얼마 전 이벤트 소싱 패턴에 대한 글을 작성했다. 글을 읽고나서 js로 간략하게 구현해봤던 내용을 글로 정리했다. 개념을 나눠 설명하기 위해 CQRS 부분은 다른 글을 통해 덧붙이려고 한다. 여기서 사용하는 구현은 프로덕션에서 사용하기에 부족한 점이 많기 때문에 개념 이해에 중점을 맞춰 코드를 보면 좋겠다. 여기서는 은행 계좌를 예시로 작성했으며 구현하는 부분은 다음과 같다.

  • AggregateRoot: 이벤트가 반영될 집합체
  • BankAccount: 집합체 구현
  • *Event: 각각의 이벤트
  • EventSourcingRepository: 이벤트를 사용해서 데이터를 다루는 리포지터리 클래스
  • InMemoryForTestingEventStore: 이벤트 저장소 동작을 확인하기 위한 클래스
  • EventStoreData: 이벤트 저장소에서 저장되는 데이터 개체 클래스

전체 코드는 gist에서 확인할 수 있으며 jsbin.com에서 테스트해볼 수 있다.


가장 먼저 집합체를 구현한다. 집합체는 일련의 이벤트를 투영할 수 있는 개체다. 은행 계좌를 열고, 닫고, 입출금을 한다면 그 정보의 집합체는 은행 계좌가 될 것이다. 먼저 AggregateRoot 클래스를 작성한다. 이 클래스는 이벤트를 받고 해당 메소드를 호출한다. js는 메서드 오버로딩이 없기 때문에 handle(event) 메소드가 apply{이벤트명} 메소드를 찾는다.

class AggregateRoot {
    apply(event) {
        this.handle(event)
        return this
    }

    handle(event) {
        var eventName = event.constructor.name
        var eventMethod = `apply${eventName}`

        if (! this[eventMethod]) {
            throw new TypeError(`${eventMethod} is not a function`)
        }

        this[eventMethod](event)
    }
}

은행계좌를 개설하는 이벤트를 작성한다.

class OpenedEvent {
    constructor(id: number, name: string) {
        this.id = id
        this.name = name
    }
}

은행 계좌 클래스를 추가한다. 이 클래스로 생성한 개체가 집합체가 되며 이벤트를 통해 갱신된다.

class BankAccount extends AggregateRoot {
    static open(id: number, name: string) {
        var bankAccount = new BankAccount
        bankAccount.apply(new OpenedEvent(id, name))
        return bankAccount
    }

    applyOpenedEvent(event) {
        this.id = event.id
        this.name = event.name

        this.closed = false
        this.balance = 0
    }
}

아래 예제에서 이벤트로 프로퍼티가 갱신되는 것을 확인할 수 있다.

var bankAccount = BankAccount.open(123456, 'Koala')
console.log(bankAccount.id, bankAccount.name) // 123456, 'Koala'

이제 여러 이벤트를 추가한다.

class WithdrawnEvent {
  constructor(id: number, amount: number) {
    this.id = id
    this.amount = amount
  }
}

class DepositedEvent {
  constructor(id: number, amount: number) {
    this.id = id
    this.amount = amount
  }
}

class ClosedEvent {
  constructor(id: number) {
    this.id = id
  }
}

메소드도 추가한다.

class BankAccount extends AggregateRoot {
    // ... 이전 코드
    withdraw(amount) {
        if (this.closed) {
            throw new Error(`${this.id} account is closed.`)
        }
        this.apply(new WithdrawnEvent(this.id, amount))
        return this
    }

    deposit(amount) {
        if (this.closed) {
            throw new Error(`${this.id} account is closed.`)
        }
        this.apply(new DepositedEvent(this.id, amount))
        return this
    }

    close() {
        if (!this.closed) {
            this.apply(new ClosedEvent(this.id))
        }
        return this
    }

    applyWithdrawnEvent(event) {
        this.balance -= event.amount
    }

    applyDepositedEvent(event) {
        this.balance += event.amount
    }

    applyClosedEvent(event) {
        this.closed = true
    }
}
var bankAccount = BankAccount.open(123456,  'Koala')
  .deposit(10000)
  .withdraw(1000)
  .deposit(3000)
  .close()

console.log(bankAccount.name, bankAccount.balance, bankAccount.closed ? 'closed' : 'opened')
// Koala 12000 closed

집합체를 사용해서 내부적으로는 이벤트를 통해 데이터가 갱신되고 있지만 개체의 일반적인 사용과 큰 차이가 없는 것을 확인할 수 있다. 각각의 메소드는 실제로 개체의 정보를 갱신하지 않고 이벤트를 생성하는 역할만 하며 이벤트를 적용하는 apply* 메소드에서만 실질적인 변화가 일어나고 있다.

정말 이 은행 계좌는 일련의 이벤트로 결과를 얻을 수 있을까? 다음 예를 보면 알 수 있다.

var events = [
    new OpenedEvent(123456, 'Koala'),
    new DepositedEvent(123456, 10000),
    new WithdrawnEvent(123456, 1000),
    new DepositedEvent(123456, 3000),
    new ClosedEvent(123456),
]

var bankAccount = new BankAccount
events.forEach(event => bankAccount.apply(event))

console.log(bankAccount.name, bankAccount.balance, bankAccount.closed ? 'closed' : 'opened')
// Koala 12000 closed

동일한 결과를 확인할 수 있다. 위 코드는 AggregateRoot에 다음처럼 추가한다.

class AggregateRoot {
    // ... 이전 코드
    initializeState(events) {
        events.forEach(event => this.apply(event))
    }
}

이제 일련의 이벤트를 다루고 저장할 수 있도록 리포지터리를 만든다.

class EventSourcingRepository {
    constructor(eventStore, aggregateType) {
        this.eventStore = eventStore
        this.aggregateType = aggregateType
    }

    load(id) {
        var events = this.eventStore.load(id)

        var aggregate = Object.create(this.aggregateType.prototype)
        aggregate.initializeState(events)
        return aggregate
    }

    save(aggregate) {
        var uncommittedEvents = aggregate.getUncommittedEvents()
        this.eventStore.append(uncommittedEvents)
    }
}

저장하지 않은 이벤트를 가져올 수 있도록 getUncommittedEvents()AggregateRoot에 구현한다. 또한 상태 초기화 시 저장하지 않은 이벤트로 다루지 않도록 initializeState 메소드도 변경한다.

class AggregateRoot {
    // ...
    uncommittedEvents = []

    getUncommittedEvents() {
        var events = this.uncommittedEvents
        this.uncommittedEvents = []
        return events
    }

    apply(event) {
        this.handle(event)
        this.uncommittedEvents.push(event)
        return this
    }

    initializeState(events) {
        events.forEach(event => this.handle(event))
    }
}

여기서는 예로 간단한 이벤트 저장소를 구현해서 사용한다. 이벤트를 저장하기 위한 구조로 EventStoreData를 다음처럼 작성한다.

class EventStoreData {
    constructor(rootId, event, createdAt) {
        this.rootId = rootId
        this.event = event
        this.createdAt = createdAt
    }
}

이벤트가 데이터베이스에 저장될 때 EventStoreData의 정의대로 저장된다고 생각해보자. 관계형 데이터베이스를 예로 든다면 이 클래스가 테이블의 스키마를 반영하고 있다고 볼 수 있다. 다음은 이 개체를 그대로 메모리에서 사용하는 예제 이벤트 저장소 클래스다.

class InMemoryForTestingEventStore {
    constructor(events) {
        this.data = events ? this.convertEvents(events) : []
    }

    load(rootId) {
        return this.data
            .filter(data => data.rootId === rootId)
            .map(data => data.event)
    }

    append(events) {
        var newData = this.convertEvents(events)
        this.data = this.data.concat(newData)
    }

    convertEvents(events) {
        return events.map(event => this.convertEventToData(event))
    }

    convertEventToData(event) {
        var createdAt = new Date().getTime()
        return new EventStoreData(event.id, event, createdAt)
    }
}

다음처럼 리포지터리로 저장하고 불러올 수 있게 되었다.

var eventStore = new InMemoryForTestingEventStore()
var repository = new EventSourcingRepository(eventStore, BankAccount)

var bankAccount = BankAccount.open(654321,  'Edward')
  .deposit(20000)
  .withdraw(1000)
  .withdraw(1000)

repository.save(bankAccount)
var loaded = repository.load(654321)
console.log(bankAccount.name, bankAccount.balance, bankAccount.closed ? 'closed' : 'opened')
// Edward 18000 opened

eventStore에 저장된 이벤트 저장소 데이터를 살펴보면 이 동작이 좀 더 와닿는다.

console.log(eventStore.data)

eventStore.data


앞에서도 말했지만 여기서 사용하는 구현은 프로덕션에서 사용하기에 부족한 점이 많다. 예를 들면 AggregateRoot에 최종 일관성을 확인하기 위한 version도 구현되어 있지 않고 EventData에 id도 정의되어 있지 않다. 패턴에 대해 이해가 되었다면 실제로 구현한 패키지를 확인하면 도움이 된다.

그리고 CQRS 패턴을 함께 사용하지 않는다면 이벤트소싱은 반쪽에 불과하다. 여기서 살펴본 이벤트소싱에 더해 CQRS를 적용하면 유연하고 강력한 아키텍처를 구성할 수 있다. CQRS 패턴은 다음 글에서 살펴보려고 한다.

이벤트 소싱 event-sourcing 패턴 정리

😢 이 페이지는 다음 주소로 변경될 예정입니다.

최근 프로젝트에서 audit을 생성하는 코드를 작성하면서 이벤트 소싱 패턴을 찾아보게 되었다. 여러 포스트를 통해 접해본 내용이지만 실제로 구현해보지 않아서 크게 와닿지 않았었다. 특히 용어가 익숙하지 않았는데 읽으며 궁금해서 찾아봤던 순서대로 정리했다.


전통적으로 사용하는 CRUD 모델을 생각해보자. 데이터를 갱신하기 위해서는 데이터 저장소에서 해당 데이터를 가져오는 작업이 필요하다. 동시성 문제가 나타날 수도 있고 확장성을 낮추는 지점이 될 수도 있다. 이벤트소싱 패턴은 일련의 이벤트를 통해 데이터를 조작하는 접근 방식이다. 데이터에 영향을 주는 모든 동작을 이벤트라는 저수준의 데이터로 관리하는 것으로 여러 문제를 해결할 수 있다. 물론 단점도 여러가지 존재하므로 필요에 따라 적용해야 한다.

이벤트소싱 패턴도 성숙한 아키텍처이기 때문에 다양한 주제로 세분화되어 있고 각각의 키워드를 알아야 쉽게 찾아볼 수 있다. 예를 들면 이벤트를 저장하고 불러오는 방식 하나만으로도 큰 주제다. 특히 이 패턴을 제대로 쓰기 위해서는 CQRS를 빼놓을 수 없기 때문에 두 이야기가 뒤섞이기도 한다.

이벤트 소싱

앞서 적은 것처럼 이벤트소싱 패턴은 일련의 이벤트를 통해 데이터를 조작한다. 현재의 상태는 변화의 총합으로 표현할 수 있다. 이 패턴을 설명할 때는 쇼핑몰을 많이 예로 든다.

다음과 같이 일련의 이벤트가 있다고 생각해보자.

id root_id event
1 1 카트 생성함
2 1 상품1 추가함
3 1 상품2 추가함
4 1 상품2 제거함
5 1 배송정보 입력함

이 이벤트를 개체에 하나씩 적용한다면 최종적으로 생성된 카트에 상품2가 추가되어 있고 배송정보까지 입력된 개체를 얻을 수 있게 된다. 기존 CRUD 모델과 비교한다면 이미 정형화된 모델에 상태를 저장하는 과정에서 나타나는 문제를 해결할 수 있다. CRUD에서는 카트에 추가되었다가 제거된 상품 목록을 뽑고 싶다고 했을 때 별도로 그 특정한 상태를 저장하지 않는다면 어려운 작업이 될 것이다. 게다가 작업을 한다 하더라도 그 작업 이후의 데이터에 대해서만 볼 수 있는 한계점이 있다. 이처럼 정형화되지 않은 데이터인 이벤트를 저장하고 있다는 점에서 더욱 유연한 변화가 가능하다.

여기서 사용되는 개체는 도메인 모델이고 흔히 집합체(aggregate)로 불리며 root_idaggregateRoot로 삼아서 각 개체로 전환한다.

var cart = events.reduce((aggregate, event) => aggregate.apply(event), new CartAggregate);
console.log(cart.getItems()) // ['상품1']
console.log(cart.shippingInfoExists()) // true

내 경우에는 다음과 같은 궁금점이 생겼다.

  • 이벤트는 어떻게 데이터로 저장하고 복원하지?
  • 이벤트가 많이 쌓이면 느려지지 않나?
  • 데이터가 필요할 때마다 매번 전부 이벤트를 돌려봐야 한다면 번거롭지 않을까?
  • 이벤트는 어떻게 다시 재생하지?

이벤트 저장(Eventstore)

이벤트 저장에 사용할 수 있는 저장소는 크게 세 가지로 분류된다.

  • 이벤트 저장에 특화된 데이터 저장소 사용 (e.g. eventstore.org)
  • NoSQL을 사용
  • 관계형 데이터베이스 사용

이벤트는 데이터베이스에 직렬화(serialize)해서 저장하고 역직렬화(deserialize)해서 사용한다. 각 저장하는 방식은 전략에 따라 다른데 관계형 테이터베이스를 사용한다면 이벤트명(주로 이벤트 타입명)과 데이터(주로 payload) 등으로 분리해서 저장한다. 이벤트의 정규 구조가 단순하기 때문에 단순히 위에서 언급한 데이터베이스가 아니더라도 용도에 맞게 선택할 수 있다.

스냅샷

스냅샷은 이벤트 저장에서 사용할 수 있는 전략이다. 기준에 따라서 이벤트가 많이 쌓이면 중간에 스냅샷을 만들고 그 스냅샷 이후의 이벤트만 가져와서 사용하는 방식이다. 특화된 저장소라면 시스템이 알아서 처리해주지만 그 외에는 이 문제를 고민해서 저장 방식을 설계해야 한다.

명령과 조회 책임 분리

필요한 데이터를 얻기 위해 모든 이벤트를 반복해서 재생하는 일은 많은 자원을 필요로 한다. 일련의 이벤트를 여러 장의 필름이라고 생각한다면 현재의 상태란 이 여러 필름을 한 위치에 투영(projecting)해서 나타난 그림이라고 볼 수 있겠다. 매번 모든 필름을 겹쳐 투영하는 대신 현재의 상태를 어딘가 저장하고 있다면 좀 더 쉽게 데이터를 사용할 수 있을 것이다.

현재 상품1의 재고량을 파악하기 위해 모든 이벤트를 투영하는 대신 재고 테이블에서 상품1의 재고량을 바로 찾아보는 것이 훨씬 쉽다. 다시 말하면 저장은 이벤트로 하지만 조회는 투영된 데이터, 구체화(materialised)된 데이터를 대상으로 수행하고 싶은 것이다. 이런 맥락에서 자연스럽게 명령과 조회의 책임을 분리하는 패턴(Command and Query Responsibility Segregation, CQRS)을 적용하게 된다. 명령으로 이벤트를 쌓고 리드 모델에서 조회하는 것이다. 명령과 조회에서의 책임 분리는 이 아키텍처의 장점을 끌어올릴 수 있다.

위에서 예시로 든 테이블의 경우는 id를 increment id와 같이 지정했지만 CQRS에서는 무작정 생성해도 충돌을 피할 수 있는 만큼 큰 id(예로 128bit GUID)를 생성해서 사용한다.

리드 모델

그렇다면 리드 모델은 어떻게 최신 데이터를 계속 유지할 수 있을까? 새 이벤트는 이벤트 버스(event bus)를 통해 전파되는데 리드 모델에 변화를 투영하는 경우에도 이벤트 리스너로서 발행되는 이벤트를 관찰하고 있다가 리드 모델을 갱신해야 하는 이벤트가 발생하는 순간에 갱신할 수 있다.

이벤트에 의해 갱신되는 리드 모델은 자료를 모두 유실하더라도 이벤트를 모두 저장하고 있다면 모든 이벤트를 이벤트 버스에 보내는 것으로 다시 복구할 수 있게 된다. 구체화된 데이터가 자료의 원형이 아니라 이벤트가 자료의 원형이기 때문에 필요에 따라서 언제든 비정규화된 조회를 새로 생성하고 삭제하는 것이 가능하다.

하지만 이 방식을 적용하면 리드 모델을 비동기적으로 처리하기 때문에 최종 일관성(eventual consistency)을 유지하게 된다. 리드 모델 갱신이 빠르다면 다시 조회를 수행했을 때 최신의 자료를 볼 수 있겠지만 연산이 많거나 부하가 커서 갱신이 느려진다면 이벤트는 생성되었지만 리드 모델은 갱신되지 않아 일시적으로 이전 데이터를 조회하게 될 수 있다. 분산 환경에서는 흔하게 나타나는 문제로 요구사항과 충돌한다면 이런 부분에 대한 대책을 세워야 한다.


간략하게 JavaScript로 이벤트소싱 패턴을 구현해 글로 작성했다.

더 읽기