Reactive Programming #
Dalam beberapa tahun terakhir, istilah Reactive Programming semakin sering muncul — terutama saat membangun sistem yang high-traffic, real-time, atau event-driven. Banyak engineer menganggapnya sekadar “async dengan style lain”, padahal sebenarnya reactive programming adalah perubahan paradigma berpikir, bukan cuma soal API atau library tertentu. Memahaminya hanya di permukaan — menggunakan RxJS atau WebFlux tanpa mengerti prinsip dasarnya — hampir selalu berakhir dengan sistem yang sulit di-debug, mixing blocking dan non-blocking code yang berbahaya, dan kompleksitas yang tidak sebanding dengan manfaatnya. Artikel ini membahas apa itu reactive programming dari prinsipnya, mengapa ia muncul sebagai solusi atas keterbatasan pendekatan tradisional, bagaimana cara implementasinya, dan kapan kamu harus atau justru tidak boleh menggunakannya.
Apa Itu Reactive Programming? #
Reactive Programming adalah paradigma pemrograman yang berfokus pada data stream dan propagasi perubahan secara asynchronous. Alih-alih program yang meminta data secara sinkron dan mengontrol alur eksekusi secara eksplisit, reactive programming mendefinisikan bagaimana sistem bereaksi terhadap event dan perubahan state — secara otomatis, non-blocking, dan asynchronous.
Perubahan mindset yang paling mendasar ada di sini:
| Model | Cara Berpikir | Analogi |
|---|---|---|
| Imperative | “Ambil data → proses → kirim response” | Pergi ke toko, beli barang, pulang |
| Async/Await | “Minta data, nanti saya lanjut saat sudah ada” | Pesan online, tunggu notifikasi |
| Reactive | “Beritahu saya jika ada data baru, saya akan bereaksi” | Langganan newsletter — kamu tidak minta, tapi konten datang sendiri |
Dalam reactive programming, semua hal diperlakukan sebagai stream — HTTP request, pesan dari message queue, event UI, perubahan database, bahkan error sekalipun. Stream ini bisa di-transform, di-filter, digabungkan, dan dikonsumsi tanpa harus memblokir thread yang menunggunya.
flowchart LR
subgraph Imperative["Pendekatan Imperative / Blocking"]
A1[Request Masuk] --> B1[Thread Dialokasikan]
B1 --> C1[Tunggu DB Response\n⏳ Thread idle]
C1 --> D1[Tunggu API Call\n⏳ Thread idle]
D1 --> E1[Kirim Response]
E1 --> F1[Thread Dilepas]
end
subgraph Reactive["Pendekatan Reactive / Non-Blocking"]
A2[Request Masuk] --> B2[Thread Terima Request]
B2 --> C2[Daftar ke DB Stream\nThread bebas ke request lain]
C2 --> D2[DB Result Tiba\nThread diambil dari pool]
D2 --> E2[Kirim Response]
endMengapa Reactive Programming Muncul? #
Reactive bukan tren iseng. Ia muncul karena keterbatasan nyata dari pendekatan tradisional dalam menghadapi skala sistem modern.
Masalah Model Thread-per-Request #
Selama puluhan tahun, model paling umum adalah: satu request = satu thread. Thread menunggu I/O (database, API eksternal, file system), dan selama menunggu, thread tersebut idle — mengkonsumsi memory tapi tidak melakukan pekerjaan berguna.
sequenceDiagram
participant Client
participant Thread1 as Thread 1\n(untuk Request A)
participant Thread2 as Thread 2\n(untuk Request B)
participant DB
Client->>Thread1: Request A masuk
Client->>Thread2: Request B masuk
Thread1->>DB: Query DB
Thread2->>DB: Query DB
Note over Thread1,Thread2: Kedua thread IDLE menunggu DB\nMemori terpakai, CPU tidak
DB-->>Thread1: Result setelah 200ms
DB-->>Thread2: Result setelah 200ms
Thread1-->>Client: Response A
Thread2-->>Client: Response B
Note over Thread1,Thread2: Dengan 10.000 request concurrent\n→ 10.000 thread → OOMDi traffic tinggi, thread pool habis, request baru masuk antrian, latency naik, dan akhirnya sistem collapse. Bukan karena CPU tidak mampu — tapi karena thread menunggu I/O yang lamban.
Solusi Reactive #
sequenceDiagram
participant Client
participant EventLoop as Event Loop\n(satu thread)
participant DB
Client->>EventLoop: Request A masuk
Client->>EventLoop: Request B masuk
EventLoop->>DB: Query DB untuk A (non-blocking)
EventLoop->>DB: Query DB untuk B (non-blocking)
Note over EventLoop: Event loop bebas terima request lain
DB-->>EventLoop: Result A tiba (event)
EventLoop-->>Client: Response A
DB-->>EventLoop: Result B tiba (event)
EventLoop-->>Client: Response B
Note over EventLoop: Satu event loop melayani ribuan requestDengan model reactive dan non-blocking I/O, satu thread (event loop) bisa melayani ribuan koneksi bersamaan karena ia tidak pernah “menunggu” — ia hanya terdaftar untuk menerima notifikasi saat hasil I/O siap.
Lima Prinsip Inti Reactive Programming #
1. Data Stream #
Dalam reactive programming, segala sesuatu adalah stream. Stream adalah urutan event yang berjalan seiring waktu — bisa emit nilai, emit error, atau selesai (complete).
flowchart LR
A[HTTP Request] --> S[Stream]
B[DB Change] --> S
C[Queue Message] --> S
D[UI Event] --> S
S --> F[filter]
F --> M[map / transform]
M --> FL[flatMap / merge]
FL --> SUB[subscribe\nkonsumen akhir]Stream memiliki tiga kemungkinan sinyal:
| Sinyal | Artinya | Penanganan |
|---|---|---|
| onNext(value) | Data baru tersedia | Proses nilai |
| onError(err) | Terjadi error | Handle error, stream selesai |
| onComplete() | Stream selesai normal | Cleanup resources |
2. Asynchronous & Non-Blocking #
Reactive system tidak pernah memblokir thread yang sedang berjalan. Setiap operasi I/O didaftarkan sebagai callback atau handler, dan thread bebas melanjutkan pekerjaan lain sambil menunggu.
// ANTI-PATTERN: blocking I/O — thread idle menunggu
func getUserData(userID string) UserData {
user := db.Query("SELECT * FROM users WHERE id = ?", userID) // BLOCK
orders := api.GetOrders(userID) // BLOCK setelah yang pertama
return merge(user, orders)
}
// BENAR: non-blocking — daftarkan callback, thread bebas
func getUserDataReactive(userID string) <-chan UserData {
result := make(chan UserData, 1)
go func() {
// Jalankan keduanya concurrent, tidak sequential
userCh := db.QueryAsync("SELECT * FROM users WHERE id = ?", userID)
orderCh := api.GetOrdersAsync(userID)
user := <-userCh
orders := <-orderCh
result <- merge(user, orders)
}()
return result
}
3. Push-Based Model #
Ini perubahan mindset paling fundamental. Dalam model imperative, caller mengontrol kapan data diambil (pull). Dalam model reactive, data di-push ke subscriber saat tersedia.
Pull (Imperative): Push (Reactive):
Caller: "Beri saya data" Producer: "Ada data baru!"
DB: "Ini datanya" Subscriber: "Oke, saya proses"
Caller: "Proses..." Producer: "Ada data lagi!"
Caller: "Beri saya lagi" Subscriber: "Oke, saya proses lagi"
Implikasi praktisnya: kamu tidak lagi menulis loop yang meminta data — kamu mendefinisikan apa yang terjadi saat data datang.
4. Backpressure #
Backpressure adalah mekanisme mengendalikan aliran data agar consumer tidak kewalahan oleh producer yang terlalu cepat. Ini adalah fitur yang tidak ada di async/await biasa.
flowchart TD
subgraph TanpaBackpressure["❌ Tanpa Backpressure"]
P1[Producer\n10.000 event/detik] --> B1[Buffer]
B1 --> C1[Consumer\n1.000 event/detik]
B1 --> OOM[💥 OutOfMemory\nBuffer overflow]
end
subgraph DenganBackpressure["✅ Dengan Backpressure"]
P2[Producer] --> BP[Backpressure Signal]
BP --> C2[Consumer]
C2 -- "Saya hanya bisa\nterima 1000/detik" --> BP
BP -- "Pelan-pelan,\nthrottle ke 1000/detik" --> P2
endStrategi backpressure yang umum:
| Strategi | Cara Kerja | Cocok Untuk |
|---|---|---|
| Buffer | Tampung event di buffer sementara | Traffic burst pendek |
| Drop | Buang event jika consumer kewalahan | Metrics, log non-kritis |
| Latest | Hanya simpan nilai terbaru, skip sisanya | Live dashboard, harga saham |
| Error | Lempar error jika buffer penuh | Data kritis yang tidak boleh hilang |
| Slow down producer | Kirim sinyal ke producer untuk melambat | Ideal jika producer supportif |
5. Error sebagai Bagian dari Stream #
Dalam reactive programming, error bukan exception yang meledak dan menghentikan alur. Error adalah event dalam stream yang bisa di-handle, di-recover, atau di-retry dengan cara yang sama seperti menangani data normal.
// ANTI-PATTERN: error handling terpisah dari stream logic — mengganggu alur
func processOrders(orders []Order) error {
for _, order := range orders {
result, err := processOrder(order)
if err != nil {
// tiba-tiba loncat ke sini, alur terputus
return err
}
sendNotification(result)
}
return nil
}
// BENAR: error sebagai bagian dari stream — bisa retry, fallback, atau skip
orderStream.
Map(processOrder). // jika error, stream emit error event
OnErrorRetry(3). // coba lagi 3x sebelum menyerah
OnErrorReturn(defaultVal). // fallback jika masih gagal
Filter(isSuccess). // hanya lanjutkan jika sukses
Subscribe(sendNotification)
Reactive Manifesto #
Reactive Programming erat kaitannya dengan Reactive Manifesto — dokumen yang mendefinisikan empat sifat sistem modern yang ideal. Reactive programming adalah enabler teknis untuk keempat sifat ini.
flowchart TD
RM[Reactive System] --> R[Responsive\nMerespons cepat dalam kondisi apapun]
RM --> RE[Resilient\nTetap hidup dan fungsional saat ada error]
RM --> E[Elastic\nScale up/down sesuai beban]
RM --> MD[Message-Driven\nBerbasis event dan message asynchronous]
MD --> R
MD --> RE
MD --> E| Sifat | Artinya dalam Praktik | Reactive Enabler |
|---|---|---|
| Responsive | Latency rendah dan konsisten meski traffic tinggi | Non-blocking I/O, event loop |
| Resilient | Isolasi kegagalan — satu komponen gagal tidak merembet | Error sebagai event, circuit breaker |
| Elastic | Mudah scale horizontal tanpa perubahan arsitektur | Stateless, message-driven |
| Message-Driven | Komponen berkomunikasi lewat message, bukan direct call | Stream, queue, event bus |
Implementasi: Dari Konseptual ke Kode #
Alur Stream Konseptual #
flowchart LR
A[Request Stream] --> B[Validation Stream\nfilter invalid]
B --> C[Enrichment Stream\nmap + flatMap]
C --> D[Business Logic Stream]
D --> E[Response Stream\nkirim ke client]
D --> F[Audit Stream\nlog ke storage]
B -- Error Event --> G[Error Handler Stream\nreturn 400]
D -- Error Event --> H[Error Handler Stream\nreturn 500]Setiap tahap tidak blocking, bisa parallel, dan kegagalan di satu tahap tidak otomatis menghentikan tahap lain.
Contoh Implementasi Go dengan Channel #
// ANTI-PATTERN: sequential blocking — total latency = sum of all latency
func getUserDashboard(userID string) Dashboard {
profile := db.GetProfile(userID) // 100ms
orders := db.GetRecentOrders(userID) // 150ms
notifs := db.GetNotifications(userID) // 80ms
// Total: 330ms
return buildDashboard(profile, orders, notifs)
}
// BENAR: concurrent non-blocking — total latency = max of all latency
func getUserDashboardReactive(ctx context.Context, userID string) Dashboard {
profileCh := make(chan Profile, 1)
ordersCh := make(chan []Order, 1)
notifsCh := make(chan []Notification, 1)
// Semua request jalan bersamaan
go func() { profileCh <- db.GetProfile(userID) }()
go func() { ordersCh <- db.GetRecentOrders(userID) }()
go func() { notifsCh <- db.GetNotifications(userID) }()
// Tunggu semua hasil — total latency ~150ms (paling lambat), bukan 330ms
return buildDashboard(<-profileCh, <-ordersCh, <-notifsCh)
}
Ekosistem Reactive per Platform #
| Platform | Library / Framework | Use Case Utama |
|---|---|---|
| Java | Project Reactor, RxJava, Spring WebFlux | Backend microservices |
| JavaScript | RxJS | Frontend state, HTTP intercept |
| Go | Channel, select, goroutine | Backend service, pipeline |
| Kotlin | Coroutines Flow | Android, backend |
| Dart/Flutter | Stream, StreamBuilder | UI reactive, real-time |
| Scala | Akka Streams | High-throughput data pipeline |
Contoh Rx Pattern (Konseptual) #
// ANTI-PATTERN: nested callback — callback hell, sulit dibaca
getUser(userID, function(user) {
getOrders(user.id, function(orders) {
getRecommendations(orders, function(recs) {
sendResponse(recs) // 3 level nesting, error handling tersebar
})
})
})
// BENAR: reactive stream — linear, composable, error terpusat
userStream$
.pipe(
filter(user => user.isActive), // hanya user aktif
switchMap(user => getOrders(user.id)), // fetch orders, batalkan yang lama
switchMap(orders => getRecommendations(orders)),
catchError(err => of(defaultRecommendations)), // fallback jika error
)
.subscribe(recs => sendResponse(recs))
Best Practice Implementasi #
End-to-End Non-Blocking #
Reactive yang setengah-setengah lebih berbahaya dari tidak reactive sama sekali. Satu blocking call di tengah pipeline reactive bisa memblokir seluruh event loop.
// ANTI-PATTERN: reactive di controller, blocking di database
// Ini lebih buruk dari sepenuhnya blocking
func (h *Handler) GetUser(c *fiber.Ctx) error {
// Controller reactive / async
userID := c.Params("id")
// Tapi DB call masih BLOCKING — memblokir event loop!
user := h.db.First(&User{}, userID) // ← blocking di tengah async context
return c.JSON(user)
}
// BENAR: end-to-end non-blocking
func (h *Handler) GetUser(c *fiber.Ctx) error {
userID := c.Params("id")
// Gunakan async DB driver atau jalankan di goroutine terpisah
user, err := h.userRepo.FindByIDAsync(c.Context(), userID)
if err != nil {
return c.Status(500).JSON(fiber.Map{"error": err.Error()})
}
return c.JSON(user)
}
Satu blocking call di dalam reactive pipeline — termasuk time.Sleep, blocking DB driver, atau sync HTTP client — bisa memblokir seluruh event loop dan menghilangkan semua manfaat reactive. Pastikan setiap layer menggunakan non-blocking I/O.Backpressure Wajib Dipikirkan #
// ANTI-PATTERN: channel tanpa buffer limit — OOM saat producer cepat
events := make(chan Event) // unbuffered atau buffer tidak terbatas
// Producer cepat sekali
go func() {
for i := 0; i < 1_000_000; i++ {
events <- generateEvent() // goroutine numpuk jika consumer lambat
}
}()
// BENAR: buffer terbatas + drop strategy yang eksplisit
events := make(chan Event, 1000) // buffer terbatas
go func() {
for event := range source {
select {
case events <- event:
// berhasil masuk buffer
default:
// buffer penuh — drop dengan log
metrics.Counter("events.dropped").Inc()
log.Warn("event dropped due to backpressure", "event_id", event.ID)
}
}
}()
Error Handling Eksplisit dalam Stream #
// ANTI-PATTERN: panic/error yang keluar dari stream tanpa handling
go func() {
for event := range events {
result, err := process(event)
if err != nil {
panic(err) // crash seluruh goroutine!
}
output <- result
}
}()
// BENAR: error sebagai bagian dari alur, dengan retry dan fallback
go func() {
for event := range events {
result, err := processWithRetry(event, 3)
if err != nil {
// Kirim ke DLQ, jangan crash pipeline
dlq <- DeadEvent{Event: event, Error: err}
metrics.Counter("stream.errors").Inc()
continue // stream terus berjalan
}
output <- result
}
}()
Observability di Sistem Async #
Debugging sistem reactive jauh lebih sulit dari sistem synchronous karena stack trace tidak linear. Correlation ID adalah keharusan.
// ANTI-PATTERN: tidak ada context propagation — tidak bisa trace request
func processEvent(event Event) {
log.Info("processing event") // log ini dari request mana?
result := callDownstream(event)
log.Info("done")
}
// BENAR: correlation ID propagasi ke seluruh chain
func processEvent(ctx context.Context, event Event) {
correlationID := ctx.Value("correlation_id").(string)
log.WithFields(log.Fields{
"correlation_id": correlationID,
"event_id": event.ID,
"event_type": event.Type,
}).Info("processing event")
// Propagasi context ke downstream call
result, err := callDownstream(ctx, event)
log.WithFields(log.Fields{
"correlation_id": correlationID,
"success": err == nil,
}).Info("event processed")
}
Reactive Programming vs Async/Await #
Banyak engineer menyamakan keduanya. Mereka mirip tujuannya — non-blocking dan asynchronous — tapi berbeda secara paradigma dan cocok untuk masalah yang berbeda.
flowchart TD
A[Butuh Non-Blocking?] --> B{Berapa banyak\nevent/data?}
B -- Satu request,\nsatu response --> C[Async/Await\nLebih simple, readable]
B -- Stream kontinu,\nbanyak event --> D[Reactive Programming\nStream-oriented]
C --> E{Butuh\nbackpressure?}
D --> F{Tim sudah\npaham reactive?}
E -- Tidak --> G[Async/Await sudah cukup ✓]
E -- Ya --> D
F -- Tidak --> H[Mulai dengan Async/Await\nMigrate bertahap]
F -- Ya --> I[Reactive Programming ✓]| Aspek | Async/Await | Reactive Programming |
|---|---|---|
| Paradigma | Imperative async | Declarative & stream-based |
| Model data | Request → satu Response | Stream event yang terus mengalir |
| Backpressure | ❌ Tidak ada built-in | ✅ Native, fundamental |
| Readability | Sangat mudah dibaca | Perlu mindset dan pengalaman |
| Cocok untuk | CRUD, API sederhana, satu response | Streaming, real-time, event processing |
| Error handling | Try/catch yang familiar | Error sebagai event dalam stream |
| Debugging | Stack trace linear | Perlu correlation ID dan tracing |
| Learning curve | Rendah | Tinggi |
| Over-engineering risk | Rendah | Tinggi jika salah konteks |
Kesimpulan perbandingan: async/await menyelesaikan syntax problem — bagaimana menulis async code dengan readable. Reactive programming menyelesaikan system-level problem — bagaimana membangun sistem yang bisa menangani aliran data kontinu dengan efisien di skala besar. Banyak sistem modern menggunakan keduanya sesuai konteks.
Kapan Harus dan Tidak Boleh Menggunakan Reactive #
Gunakan Reactive Jika: #
| Kondisi | Contoh Konkret |
|---|---|
| High concurrency | Ribuan koneksi WebSocket bersamaan |
| I/O heavy | API gateway yang memanggil 10+ downstream service |
| Event-driven system | Consumer Kafka, SQS, Pub/Sub |
| Real-time requirement | Live dashboard, chat, stock price feed |
| Backpressure kritis | Pipeline data di mana producer lebih cepat dari consumer |
| Streaming data | Video streaming, log aggregation pipeline |
Jangan Gunakan Reactive Jika: #
❌ CRUD API sederhana dengan traffic rendah
→ Reactive hanya menambah kompleksitas tanpa benefit
❌ CPU-intensive task (image processing, ML inference, enkripsi berat)
→ Reactive tidak mengurangi CPU cost; gunakan worker pool
❌ Tim belum memahami async dan stream thinking
→ Bug sulit dilacak, maintenance mahal, onboarding lama
❌ Sistem kecil atau short-lived
→ Investasi learning curve tidak sebanding
❌ Seluruh stack tidak bisa non-blocking
→ Half-reactive lebih berbahaya dari fully blocking
Jangan gunakan Reactive Programming hanya karena terdengar canggih atau teknologinya populer. Reactive yang salah konteks menghasilkan sistem yang lebih kompleks, lebih sulit di-debug, dan lebih susah di-maintain — tanpa keuntungan performa apapun.
Rule of thumb:
- Gunakan Reactive jika masalahnya adalah skala, concurrency, dan aliran event kontinu
- Gunakan Async/Await jika masalahnya adalah keterbacaan dan kecepatan development
- Mulai sederhana — tambahkan reactive complexity hanya saat kamu sudah membuktikan bahwa kamu membutuhkannya
Anti-Pattern yang Harus Dihindari #
// ✗ Reactive setengah-setengah — blocking di tengah pipeline async
go func() {
for event := range stream {
result := blockingDBCall(event) // memblokir goroutine
output <- result
}
}()
// ✓ Gunakan async DB driver atau jalankan blocking call di goroutine tersendiri
// ✗ Channel tanpa backpressure — OOM menunggu waktu
events := make(chan Event, 1_000_000) // buffer besar bukan solusi
// ✓ Buffer terbatas + drop policy yang eksplisit + metrics
// ✗ Tidak ada correlation ID — debugging jadi investigasi buta
log.Info("processing") // ini dari request mana??
// ✓ Propagasi context dengan correlation_id ke seluruh chain
// ✗ Error di-panic — crash seluruh pipeline
if err != nil { panic(err) }
// ✓ Error sebagai event: kirim ke error channel atau DLQ, pipeline terus jalan
// ✗ Reactive untuk CRUD sederhana — over-engineering
// 5 operator Rx untuk query yang bisa diselesaikan satu SQL + async/await
userStream.pipe(filter(...), map(...), switchMap(...), mergeMap(...), catchError(...))
.subscribe(sendResponse)
// ✓ async/await sudah cukup untuk request-response sederhana
Checklist Implementasi Reactive #
ARSITEKTUR:
□ Semua layer menggunakan non-blocking I/O (tidak ada blocking call di event loop)
□ DB driver mendukung async (pgx async, MongoDB async, dsb)
□ HTTP client menggunakan non-blocking request
□ Backpressure strategy sudah didefinisikan untuk setiap stream
IMPLEMENTASI:
□ Buffer channel dibatasi — tidak ada unbounded buffer
□ Drop strategy eksplisit dengan logging dan metrics
□ Error tidak di-panic — dikirim ke error channel atau DLQ
□ Retry dengan limit untuk operasi yang bisa di-retry
OBSERVABILITY:
□ Correlation ID dibuat di entry point dan dipropagasi ke seluruh chain
□ Setiap stage stream punya metrics: throughput, latency, error rate
□ Distributed tracing aktif (Jaeger, Zipkin, atau OpenTelemetry)
TESTING:
□ Test concurrent: beberapa producer + consumer bersamaan
□ Test backpressure: producer jauh lebih cepat dari consumer
□ Test error recovery: downstream gagal, pipeline tetap berjalan
□ Load test: validasi performa vs blocking equivalent
Ringkasan #
- Reactive Programming adalah paradigma, bukan library — ia mengubah cara berpikir tentang data dan event, dari “minta dan tunggu” menjadi “daftar dan bereaksi”.
- Non-blocking I/O adalah fondasinya — satu event loop bisa melayani ribuan koneksi karena tidak pernah idle menunggu; thread bebas melanjutkan pekerjaan lain.
- Segala sesuatu adalah stream — HTTP request, pesan queue, event UI, bahkan error sekalipun diperlakukan sebagai event dalam stream yang bisa di-transform dan dikonsumsi.
- Backpressure adalah fitur eksklusif reactive — mekanisme consumer memberi sinyal ke producer agar melambat; tidak ada di async/await biasa.
- Error sebagai event, bukan exception — error di-handle dalam stream sama seperti data; bisa di-retry, di-fallback, atau di-forward ke DLQ tanpa menghentikan pipeline.
- Reactive ≠ Async/Await — async/await menyelesaikan keterbacaan kode async; reactive menyelesaikan masalah skala, concurrency, dan aliran event kontinu.
- End-to-end non-blocking atau tidak sama sekali — reactive setengah-setengah lebih berbahaya dari fully blocking karena bisa memblokir event loop.
- Observability adalah keharusan — correlation ID dan distributed tracing wajib ada karena stack trace di sistem async tidak linear.
- Jangan reactive tanpa kebutuhan — CRUD sederhana, CPU-heavy task, dan tim yang belum siap adalah alasan yang valid untuk tidak menggunakan reactive.
← Sebelumnya: Replay Strategy Berikutnya: Async Processing →