BACK_TO_WRITING

Building a Production-Ready Digital Wallet: Kafka, PostgreSQL, and the Reality of Eventual Consistency

What I learned building a distributed wallet system with Spring Boot, Kafka, and PostgreSQL - from optimistic locking nightmares to outbox pattern salvation

12 min read
microservicesspring-bootkafkapostgresqldistributed-systemsevent-driven
## The Challenge I wanted to understand how real financial systems work. Not the theory—the actual implementation details. So I built a digital wallet system from scratch with two hard requirements: 1. **Strong consistency for money** - balances must be exact, no lost updates 2. **Eventual consistency for history** - audit trail can lag by milliseconds The twist? I split these concerns into two microservices communicating via Kafka. What could go wrong? Spoiler: A lot. But that's where the learning happened. ## What I Actually Built The system is deliberately simple but production-ready: **Wallet Service (Port 8080)** - Create wallets - Fund wallets (add money) - Transfer money between wallets - Publishes events to Kafka **History Service (Port 8081)** - Consumes events from Kafka - Builds immutable audit trail - Provides transaction history APIs **Shared Infrastructure** - PostgreSQL database (both services) - Apache Kafka (event streaming) - Docker Compose (orchestration) ``` ┌─────────────┐ ┌──────────────┐ │ Client │ │ Client │ └──────┬──────┘ └──────┬───────┘ │ POST /wallets/{id}/fund │ GET /wallets/{id}/history ▼ ▼ ┌──────────────┐ ┌──────────────┐ │ Wallet │──┐ ┌──│ History │ │ Service │ │ Kafka Events │ │ Service │ │ │ │ │ │ │ │ - Balances │ │ ┌────────────┐ │ │ - Audit Trail│ │ - Transfers │──┼─►│ Kafka │──┼─►│ - Queries │ │ - Txn Log │ │ │ │ │ │ │ └──────┬───────┘ │ └────────────┘ │ └──────┬───────┘ │ │ │ │ └──────────┼──────────────────┼─────────┘ │ │ └───────┬──────────┘ ▼ ┌──────────────┐ │ PostgreSQL │ │ │ │ - wallets │ │ - wallet_txns│ │ - events │ └──────────────┘ ``` ## The Problems I Had to Solve ### Problem 1: Lost Updates (The $50 That Disappeared) My first implementation was naive: ```java @Transactional public Wallet fundWallet(String walletId, BigDecimal amount) { Wallet wallet = walletRepository.findById(walletId).orElseThrow(); wallet.setBalance(wallet.getBalance().add(amount)); // WRONG return walletRepository.save(wallet); } ``` **What happened:** I ran a concurrent test with 100 threads funding the same wallet with $1 each. Expected final balance: $100. Actual balance: $73. **The problem:** Lost updates. Here's the timeline: ``` Thread A: Read balance = $50 Thread B: Read balance = $50 Thread A: Write balance = $51 (50 + 1) Thread B: Write balance = $51 (50 + 1) ← Overwrote Thread A! Final: $51 (should be $52) Multiply by 100 threads → $27 lost ``` **The Solution: Optimistic Locking** ```java @Entity public class Wallet { @Id private String id; private BigDecimal balance; @Version // ← This saves everything private Long version; } ``` Now when two threads try to update: ```sql -- Thread A UPDATE wallets SET balance = 51, version = 6 WHERE id = 'abc' AND version = 5; Success -- Thread B UPDATE wallets SET balance = 51, version = 6 WHERE id = 'abc' AND version = 5; No rows updated (version is now 6!) -- JPA throws OptimisticLockException -- Spring retries Thread B automatically -- Thread B reads fresh data (balance = 51, version = 6) -- Thread B updates to 52, version = 7 ✓ ``` **Result:** 100 concurrent operations, final balance = $100.00. Perfect. ### Problem 2: Transfer Deadlocks Transfers are where things got interesting. I needed to update TWO wallets atomically. **First attempt:** ```java @Transactional public void transfer(String from, String to, BigDecimal amount) { Wallet fromWallet = walletRepository.findById(from).orElseThrow(); Wallet toWallet = walletRepository.findById(to).orElseThrow(); fromWallet.deduct(amount); toWallet.addFunds(amount); walletRepository.saveAll(List.of(fromWallet, toWallet)); } ``` **Concurrent transfer scenario:** ``` Transfer A: wallet-999 → wallet-111 Transfer B: wallet-111 → wallet-999 Timeline: T1: A locks wallet-999 (optimistic lock) T2: B locks wallet-111 (optimistic lock) T3: A tries to lock wallet-111 → BLOCKED T4: B tries to lock wallet-999 → BLOCKED DEADLOCK! PostgreSQL kills one transaction ``` **The Solution: Sorted Pessimistic Locking** ```java @Transactional public void transfer(String from, String to, BigDecimal amount) { // Always lock in sorted order List<String> ids = Arrays.asList(from, to); Collections.sort(ids); // Alphabetical order // Pessimistic lock: SELECT ... FOR UPDATE List<Wallet> wallets = walletRepository.findByIdInOrderById(ids); Wallet fromWallet = wallets.stream() .filter(w -> w.getId().equals(from)) .findFirst().orElseThrow(); Wallet toWallet = wallets.stream() .filter(w -> w.getId().equals(to)) .findFirst().orElseThrow(); fromWallet.deduct(amount); toWallet.addFunds(amount); walletRepository.saveAll(Arrays.asList(fromWallet, toWallet)); } ``` **Why this works:** Both transfers now lock wallets in the same order [111, 999]: ``` Transfer A: Sort → [111, 999] → Lock 111 → Lock 999 Transfer B: Sort → [111, 999] → Try lock 111 (waits for A) → Eventually succeeds No circular wait = No deadlock ``` This is the classic solution to the dining philosophers problem applied to database transactions. ### Problem 3: The Event That Never Arrived Early implementation: ```java @Transactional public Wallet fundWallet(String walletId, BigDecimal amount) { // Update database wallet.addFunds(amount); walletRepository.save(wallet); // Publish to Kafka kafkaTemplate.send("wallet_events", event); // Not part of transaction! return wallet; } ``` **The disaster scenario:** 1. Database transaction commits 2. Application publishes to Kafka 3. Kafka is down 4. Event lost forever 5. Wallet shows $100, history shows $0 **The Solution: Outbox Pattern** Instead of publishing directly to Kafka: ```java @Transactional public Wallet fundWallet(String walletId, BigDecimal amount) { wallet.addFunds(amount); walletRepository.save(wallet); // Save event to outbox table (same transaction!) OutboxEvent outboxEvent = new OutboxEvent( UUID.randomUUID().toString(), "WALLET_FUNDED", eventData ); outboxRepository.save(outboxEvent); // Both succeed or both fail (atomic) } ``` Background job publishes events: ```java @Scheduled(fixedDelay = 1000) // Every 1 second @Transactional public void publishOutbox() { List<OutboxEvent> pending = outboxRepository .findTop100ByPublishedFalseOrderByCreatedAtAsc(); for (OutboxEvent event : pending) { try { kafkaTemplate.send("wallet_events", convertToKafkaEvent(event)); event.markPublished(); outboxRepository.save(event); } catch (Exception e) { log.error("Failed to publish, will retry", e); // Don't mark as published → retry next time } } } ``` **Result:** Even if Kafka is down for hours, events eventually get published. No data loss. ### Problem 4: Duplicate Events (The $100 That Became $200) Kafka guarantees "at-least-once" delivery. That "at-least" part matters: ``` History Service: 1. Receives event: WALLET_FUNDED $100 2. Saves to database 3. About to acknowledge to Kafka... CRASH! 4. Restarts 5. Kafka redelivers same event 6. Saves to database again 7. History shows: Funded $100, Funded $100 (duplicate!) ``` **The Solution: Idempotency** ```java @KafkaListener(topics = "wallet_events") @Transactional public void consumeEvent(WalletEvent event, Acknowledgment ack) { // Check if already processed if (eventRepository.existsByTransactionId(event.getTransactionId())) { log.warn("Duplicate event, skipping: {}", event.getTransactionId()); ack.acknowledge(); // Still acknowledge to stop redelivery return; } // Process event TransactionEvent historyEvent = new TransactionEvent( UUID.randomUUID().toString(), event.getWalletId(), event.getUserId(), event.getAmount(), event.getEventType(), event.getTransactionId() // ← Deduplication key ); eventRepository.save(historyEvent); ack.acknowledge(); // Tell Kafka we're done } ``` **Result:** Process same event 1 time or 100 times = same outcome. ## The Reality of Eventual Consistency Here's what actually happens when you fund a wallet: ``` T=0ms: Client sends POST /wallets/abc/fund T=10ms: Wallet Service updates balance in database T=11ms: Client receives 200 OK (balance updated!) T=15ms: Event published to Kafka T=50ms: History Service receives event T=60ms: History Service saves event T=61ms: Client queries history... might not be there yet! Lag: ~50ms on average (sometimes 200ms under load) ``` For a financial system, this is **acceptable** for audit trails. Users care that their balance is correct NOW. They can tolerate seeing "Processing..." in transaction history for 50ms. **The UI reflects this:** ```json GET /api/wallets/abc-123 { "balance": 100.00, "status": "ACTIVE" } GET /api/wallets/abc-123/history { "events": [], "status": "SYNCING", "message": "Transaction history updating..." } // 100ms later... GET /api/wallets/abc-123/history { "events": [ {"type": "WALLET_FUNDED", "amount": 100.00} ], "status": "SYNCED" } ``` ## Testing Was Everything ### Concurrent Funding Test ```java @Test void shouldHandleConcurrentFunding() throws Exception { Wallet wallet = walletService.createWallet("alice"); ExecutorService executor = Executors.newFixedThreadPool(100); CountDownLatch latch = new CountDownLatch(100); for (int i = 0; i < 100; i++) { executor.submit(() -> { try { walletService.fundWallet(wallet.getId(), new BigDecimal("1.00")); } finally { latch.countDown(); } }); } latch.await(); Wallet updated = walletService.getWallet(wallet.getId()); assertThat(updated.getBalance()) .isEqualByComparingTo(new BigDecimal("100.00")); // Exact! List<Transaction> txns = transactionRepository .findByWalletId(wallet.getId()); assertThat(txns).hasSize(100); // All recorded } ``` **Result:** PASSED. Every single transaction recorded, no money lost. ### Idempotency Test ```java @Test void shouldHandleDuplicateEvents() { String transactionId = UUID.randomUUID().toString(); WalletFundedEvent event = new WalletFundedEvent(); event.setTransactionId(transactionId); event.setWalletId("test-wallet"); event.setAmount(new BigDecimal("50.00")); // Publish same event twice kafkaTemplate.send("wallet_events", event); kafkaTemplate.send("wallet_events", event); // Wait for processing await().atMost(10, SECONDS).untilAsserted(() -> { List<TransactionEvent> events = eventRepository .findByWalletId("test-wallet"); assertThat(events).hasSize(1); // Only one saved! }); } ``` **Result:** PASSED. Second event detected and skipped. ### Using Testcontainers Real databases and Kafka in tests: ```java @SpringBootTest @Testcontainers class WalletServiceIntegrationTest { @Container static PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>( DockerImageName.parse("postgres:15-alpine") ); @Container static KafkaContainer kafka = new KafkaContainer( DockerImageName.parse("confluentinc/cp-kafka:7.5.0") ); @DynamicPropertySource static void configureProperties(DynamicPropertyRegistry registry) { registry.add("spring.datasource.url", postgres::getJdbcUrl); registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers); } // Tests run against real infrastructure } ``` This caught bugs that mocks never would have. ## Performance Numbers After optimization: | Operation | Throughput | Avg Latency | P95 Latency | P99 Latency | |-----------|------------|-------------|-------------|-------------| | Create Wallet | 450 req/sec | 35ms | 89ms | 145ms | | Fund Wallet | 380 req/sec | 42ms | 105ms | 178ms | | Transfer | 210 req/sec | 78ms | 156ms | 234ms | | Get History | 1,800 req/sec | 12ms | 34ms | 67ms | **Kafka Consumer Lag:** 50-100ms average (eventual consistency window) **Database Load:** 60% CPU utilization at peak throughput **Bottleneck:** Transfers (requires locking two wallets). Could be optimized with queue-based processing. ## What I Learned About Spring Boot ### 1. `@Transactional` Has Limits It only works for **local database transactions**. Kafka publishes, HTTP calls to other services—none of that is rolled back if the transaction fails. This is why the outbox pattern exists. ### 2. Constructor Injection > Field Injection ```java // Bad @Autowired private WalletRepository walletRepository; // Good private final WalletRepository walletRepository; public WalletService(WalletRepository walletRepository) { this.walletRepository = walletRepository; } ``` Benefits: testable, immutable, explicit dependencies. ### 3. Spring Data JPA Method Naming Is Magic ```java List<Wallet> findByUserId(String userId); // Generates: SELECT * FROM wallets WHERE user_id = ? List<Transaction> findByWalletIdOrderByCreatedAtDesc(String walletId); // Generates: SELECT * FROM wallet_transactions // WHERE wallet_id = ? // ORDER BY created_at DESC ``` No SQL writing needed for 90% of queries. ## What I Learned About Kafka ### 1. Manual Offset Commits Are Worth It ```yaml spring: kafka: consumer: enable-auto-commit: false ``` With auto-commit, Kafka marks messages as processed immediately. If your consumer crashes during processing, **the message is lost forever**. With manual commit, you control exactly when to acknowledge: ```java @KafkaListener(topics = "wallet_events") public void consume(WalletEvent event, Acknowledgment ack) { try { processEvent(event); saveToDatabase(event); ack.acknowledge(); // Only after successful processing } catch (Exception e) { // Don't acknowledge → Kafka will redeliver throw e; } } ``` ### 2. Partitioning by Key Ensures Ordering ```java kafkaTemplate.send(topic, event.getWalletId(), event); // ↑ Key ``` Kafka guarantees: **messages with the same key go to the same partition in order**. This means History Service processes all events for wallet-123 in chronological order. Critical for building correct history. ### 3. Consumer Groups Enable Scaling ```yaml spring: kafka: consumer: group-id: history-service-group ``` If you run 3 instances of History Service in the same consumer group, Kafka divides partitions among them: ``` Kafka Topic (3 partitions) ├── Partition 0 → History Service Instance 1 ├── Partition 1 → History Service Instance 2 └── Partition 2 → History Service Instance 3 ``` Free horizontal scaling. ## What I'd Do Differently ### 1. Separate Databases Per Service I used a shared database for simplicity. In production, each service should own its data: ``` Wallet Service → wallet_db History Service → history_db ``` Benefits: true independence, separate scaling, fault isolation. Trade-off: no database-level foreign keys, need application-level consistency. ### 2. Add Distributed Tracing from Day 1 I spent hours debugging issues that spanned services. Zipkin would have shown me exactly where requests were slow or failing. ``` Trace ID: abc-123 Wallet Service (45ms) └─ fundWallet() (42ms) ├─ DB update (10ms) └─ Kafka publish (30ms) ↓ Kafka (20ms) ↓ History Service (15ms) └─ consumeEvent() (12ms) └─ DB insert (10ms) Total: 80ms ``` ### 3. Circuit Breakers for Kafka ```java @CircuitBreaker(name = "kafkaPublisher", fallbackMethod = "fallbackPublish") public void publishEvent(WalletEvent event) { kafkaTemplate.send(topic, event); } public void fallbackPublish(WalletEvent event, Exception ex) { log.error("Circuit breaker OPEN, saving to outbox"); outboxRepository.save(convertToOutbox(event)); } ``` If Kafka is down, stop trying immediately. Use outbox as backup. ### 4. Metrics and Alerting ```java @Component public class WalletMetrics { private final Counter walletsCreated; private final Timer fundingDuration; public WalletMetrics(MeterRegistry registry) { this.walletsCreated = Counter.builder("wallets.created") .register(registry); this.fundingDuration = Timer.builder("wallet.funding.duration") .register(registry); } } ``` Expose via `/actuator/metrics` and scrape with Prometheus. **Alert on:** - Error rate > 1% - P99 latency > 500ms - Kafka consumer lag > 10,000 messages - Optimistic lock retry rate > 5% ## The Honest Truth About Microservices **What you gain:** - Independent scaling - Independent deployment - Technology flexibility - Team autonomy - Fault isolation **What you pay:** - Distributed transactions (sagas, outbox pattern) - Eventual consistency (user experience challenges) - Network failures (circuit breakers, retries) - Monitoring complexity (distributed tracing required) - Testing complexity (need Testcontainers or similar) - Operational overhead (more services to deploy) **Was it worth it?** For learning: 100% yes. For a startup MVP: probably not. Start with a modular monolith, split later. ## Key Takeaways 1. **Optimistic locking** prevents lost updates in concurrent scenarios 2. **Pessimistic locking** with sorted IDs prevents deadlocks in transfers 3. **Outbox pattern** guarantees event delivery even when Kafka is down 4. **Idempotency** handles Kafka's at-least-once delivery 5. **DECIMAL type** for money (never float/double) 6. **Testcontainers** catch bugs mocks miss 7. **Eventual consistency** requires better UX, not just better code ## Code & Resources - **GitHub Repository:** [github.com/yourusername/digital-wallet](https://github.com/yourusername/digital-wallet) - **Database Schema:** Complete with constraints and indexes - **Postman Collection:** All API endpoints with examples **Books that helped:** - *Designing Data-Intensive Applications* by Martin Kleppmann - *Database Internals* by Alex Petrov - *Enterprise Integration Patterns* by Gregor Hohpe **Concepts to study:** - CAP theorem (I chose AP—availability over strict consistency) - Saga pattern (orchestration vs choreography) - Event sourcing vs event-driven architecture - Two-phase commit (and why we avoid it)