Reactive Programming #

Ketika traffic sistem melonjak, pola yang paling dulu kehabisan napas adalah pola request-per-thread yang blocking. Setiap request mengikat satu thread, setiap thread menunggu I/O selesai, dan saat thread pool habis, sistem berhenti merespons meski CPU masih menganggur. Reactive Programming lahir sebagai jawaban atas masalah struktural ini — bukan sekadar cara menulis kode yang lebih keren, melainkan perubahan paradigma dalam cara sistem berpikir tentang data dan waktu. Alih-alih program yang meminta data secara aktif dan menunggu hasilnya, reactive system adalah sistem yang bereaksi terhadap data yang datang secara asynchronous. Panduan ini membahas reactive programming dari prinsip dasarnya, lima konsep inti yang harus dipahami, perbandingan jujur dengan async/await, contoh kode konkret, hingga decision guide yang jelas kapan paradigma ini layak dipakai dan kapan justru over-engineering.

Apa Itu Reactive Programming? #

Reactive Programming adalah paradigma pemrograman yang berfokus pada data stream dan propagasi perubahan secara asynchronous. Program tidak mengontrol kapan data tersedia — ia mendaftarkan diri sebagai subscriber dan bereaksi ketika data mengalir.

Perbedaan paling mendasar dibanding pendekatan imperative:

Pendekatan Imperative (Pull-based):
  1. Program request data
  2. Program menunggu (thread terblokir)
  3. Data tersedia
  4. Program melanjutkan eksekusi

Pendekatan Reactive (Push-based):
  1. Program mendaftar sebagai subscriber
  2. Thread bebas mengerjakan hal lain
  3. Data tersedia → sistem memanggil subscriber
  4. Subscriber bereaksi terhadap data

Analogi yang paling mudah dipahami: imperative programming seperti menelepon restoran untuk menanyakan apakah meja sudah siap — kamu harus menelepon berulang kali. Reactive programming seperti meninggalkan nomor telepon dan meminta mereka menghubungimu saat meja siap — kamu bebas melakukan hal lain sementara menunggu.

// Imperative — pull-based, blocking
func getOrderStatus(orderID string) OrderStatus {
    result := db.Query("SELECT status FROM orders WHERE id = ?", orderID)
    // thread idle selama query berjalan
    return result.Status
}

// Reactive — push-based, non-blocking (konsep RxGo/channel)
func watchOrderStatus(orderID string) <-chan OrderStatus {
    ch := make(chan OrderStatus)
    go func() {
        defer close(ch)
        // eksekusi async — thread bebas, data dikirim ke channel saat siap
        for update := range db.Watch("orders", orderID) {
            ch <- update.Status
        }
    }()
    return ch
}

Lima Prinsip Inti Reactive Programming #

Reactive bukan sekadar “pakai async”. Ada lima prinsip yang membedakannya secara fundamental dari pendekatan asynchronous biasa.

1. Segalanya Adalah Stream #

Di reactive programming, setiap sumber data — HTTP request, event UI, query database, message queue, bahkan timer — diperlakukan sebagai stream: urutan nilai yang mengalir dari waktu ke waktu.

Stream HTTP Request:
  req1 ──── req2 ──── req3 ──── req4 ──→  (waktu)

Stream Event User:
  click ── scroll ── input ── click ──→

Stream Database Change:
  insert ──── update ──── delete ───→

Stream bisa melakukan tiga hal: emit nilai, emit error, atau complete (selesai). Model ini membuat semua sumber data bisa diperlakukan dengan cara yang seragam dan bisa dikombinasikan.

2. Asynchronous dan Non-Blocking #

Reactive system tidak memblokir thread saat menunggu I/O. Sebaliknya, thread mendaftarkan callback dan langsung bebas untuk mengerjakan request lain. Ini yang membuat satu thread bisa melayani ribuan koneksi secara bersamaan.

Thread pool konvensional (blocking):
  Thread 1: [===request A: tunggu DB===]
  Thread 2: [===request B: tunggu API==]
  Thread 3: [===request C: tunggu file=]
  Thread 4: idle (menunggu request baru)
  → Saat traffic tinggi: thread pool habis, request antri

Thread pool reactive (non-blocking):
  Thread 1: [req A setup] → [req B setup] → [req C callback] → [req D setup]
  Thread 2: [req E setup] → [req A callback] → [req F setup] → [req B callback]
  → Thread tidak pernah idle menunggu I/O, throughput jauh lebih tinggi

3. Push-Based Model #

Dalam model pull-based, subscriber meminta data secara aktif. Dalam model push-based reactive, producer yang mendorong data ke subscriber ketika tersedia. Ini perubahan mindset yang fundamental — dari “saya minta data” menjadi “beritahu saya kalau ada data”.

4. Backpressure #

Backpressure adalah mekanisme yang memungkinkan consumer memberitahu producer untuk melambat ketika tidak mampu memproses data secepat data diproduksi. Ini adalah salah satu keunggulan paling penting reactive dibanding async biasa.

Tanpa backpressure:
  Producer  ──[1000 events/s]──→  Consumer [100 events/s]
                                      │
                                  Buffer penuh → memory overflow → crash

Dengan backpressure:
  Producer  ──[1000 events/s]──→  Consumer [100 events/s]
       ↑                               │
       └──[lambat, aku hanya siap 100/s]─┘
  Producer menyesuaikan kecepatan atau melakukan buffering/dropping yang terkontrol
// BENAR: implementasi backpressure dengan channel berkapasitas terbatas di Go
func processOrders(ctx context.Context) {
    // Buffer 100 — producer tidak bisa kirim lebih dari 100 item
    // sebelum consumer mengambilnya
    orderCh := make(chan Order, 100)

    // Producer
    go func() {
        defer close(orderCh)
        for order := range orderSource.Stream(ctx) {
            select {
            case orderCh <- order:
                // berhasil dikirim
            case <-ctx.Done():
                return
            default:
                // ✓ buffer penuh — terapkan strategi: drop, wait, atau kirim ke overflow queue
                metrics.Counter("order.backpressure.dropped").Inc()
                log.Warn("backpressure: order dropped, consumer too slow")
            }
        }
    }()

    // Consumer — hanya memproses secepat kemampuannya
    for order := range orderCh {
        processOrder(order)
    }
}

5. Error sebagai Bagian dari Stream #

Di pemrograman imperative, error adalah exception yang keluar dari alur normal. Di reactive programming, error adalah event seperti event lainnya — ia mengalir melalui stream dan ditangani secara eksplisit sebagai bagian dari pipeline.

Stream dengan error handling:

Normal events:  val1 ──── val2 ──── val3 ──→
                                    │
Error event:    val1 ──── val2 ──── ERR
                                    │
                              onError handler
                                    │
                           retry / fallback / propagate

Model ini memaksa error handling menjadi eksplisit dan terstruktur, bukan tercecer di berbagai tempat sebagai try-catch yang terpencar.


Reactive Manifesto #

Reactive Programming sering dikaitkan dengan Reactive Manifesto — dokumen yang mendefinisikan empat sifat sistem yang dianggap “reactive” dalam arti yang lebih luas:

SifatArtinya
ResponsiveSelalu merespons dalam waktu yang dapat diprediksi, bahkan saat kondisi tidak ideal
ResilientTetap beroperasi saat komponen gagal — kegagalan terisolasi, tidak menjalar
ElasticSkala naik atau turun sesuai beban tanpa perubahan arsitektur
Message-DrivenKomunikasi antar komponen melalui message asynchronous, loose coupling

Reactive Programming sebagai paradigma coding adalah enabler untuk keempat sifat ini — ia menyediakan primitif (stream, backpressure, non-blocking I/O) yang membuat sistem lebih mudah didesain untuk menjadi responsive, resilient, elastic, dan message-driven.


Reactive Programming vs Async/Await #

Ini adalah perbandingan yang paling sering menimbulkan kebingungan. Keduanya asynchronous dan non-blocking, tapi berbeda secara fundamental dalam cara berpikir.

Async/Await — Asynchronous yang Imperative #

Async/await adalah syntactic sugar untuk menulis kode asynchronous dengan gaya linear yang mudah dibaca. Model mentalnya tetap: “lakukan A, tunggu hasilnya, lalu lakukan B”. Kontrol alur masih dipegang oleh pemanggil.

// Async/await style (Go dengan goroutine + channel)
func handleOrder(orderID string) (Response, error) {
    // Alur linear: satu operasi selesai, baru lanjut ke berikutnya
    order, err := fetchOrder(orderID)       // tunggu
    if err != nil { return Response{}, err }

    payment, err := processPayment(order)   // tunggu
    if err != nil { return Response{}, err }

    receipt, err := generateReceipt(payment) // tunggu
    if err != nil { return Response{}, err }

    return Response{Receipt: receipt}, nil
}

Reactive — Stream-Oriented dan Event-Driven #

Reactive mendeskripsikan transformasi data, bukan urutan langkah. Alur adalah: “ketika order tersedia, proses pembayaran; ketika pembayaran berhasil, generate receipt; kirimkan hasilnya ke semua subscriber.”

// Reactive style — pipeline transformasi (konsep, bukan library spesifik)
func orderPipeline(orderStream <-chan Order) <-chan Receipt {
    // Setiap stage adalah transformation, bukan sequential call
    paymentStream := processPayments(orderStream)    // transform stream
    receiptStream := generateReceipts(paymentStream)  // transform stream
    return receiptStream
    // Pipeline berjalan otomatis saat ada data yang mengalir
}

// Consumer mendaftarkan diri — tidak perlu tahu kapan data datang
func main() {
    receipts := orderPipeline(orderSource.Stream())
    for receipt := range receipts {
        notify(receipt.CustomerEmail, receipt)
    }
}

Perbandingan Langsung #

AspekAsync/AwaitReactive Programming
ParadigmaImperative asyncDeclarative, stream-based
Model dataSatu nilai per operasiStream nilai yang berkelanjutan
BackpressureTidak ada secara nativeBawaan (first-class citizen)
KeterbacaanSangat mudah, linierPerlu mindset baru
Cocok untukCRUD, request-responseStreaming, real-time, event
Error handlingTry-catch / return errorError sebagai event di stream
SkalabilitasBaikSangat tinggi
Learning curveRendahCukup tinggi

Kesimpulannya: async/await menyelesaikan masalah keterbacaan kode asynchronous. Reactive programming menyelesaikan masalah level sistem — concurrency ekstrem, backpressure, stream yang tidak pernah selesai. Banyak sistem modern menggunakan keduanya sesuai konteks: async/await untuk logika bisnis yang sederhana, reactive untuk pipeline data yang complex.


Implementasi di Berbagai Ekosistem #

Reactive programming bukan milik satu bahasa atau framework. Berikut implementasi yang paling banyak digunakan:

Ekosistem     Library / Tool              Keterangan
────────────  ──────────────────────────  ─────────────────────────────────
Java          Project Reactor, RxJava     Fondasi Spring WebFlux
Spring        Spring WebFlux              HTTP reactive end-to-end
JavaScript    RxJS                        Sangat populer di Angular
Kotlin        Kotlin Flow, Coroutines     Lebih idiomatis dari RxJava
Dart/Flutter  Stream, StreamController   Native di Dart SDK
Go            channel, select, goroutine  Reactive pattern tanpa library
Scala         Akka Streams               Untuk Akka-based system

Contoh pipeline reaktif sederhana yang menggambarkan konsep operator — filter, transform, dan combine:

// Dart/Flutter — stream pipeline untuk real-time search
Stream<List<Product>> searchProducts(Stream<String> queryStream) {
    return queryStream
        // BENAR: debounce — tunggu 300ms setelah user berhenti mengetik
        .debounceTime(Duration(milliseconds: 300))

        // Filter query terlalu pendek
        .where((query) => query.length >= 2)

        // Jangan proses query yang sama berturut-turut
        .distinct()

        // Untuk setiap query, buat request baru (batalkan yang lama)
        .switchMap((query) => productRepository.search(query)
            .asStream()
            .handleError((error) {
                log.error('Search failed: $error');
                return <Product>[]; // fallback: return empty list
            })
        );
}
// Go — fan-out pattern: satu stream, banyak consumer
func fanOut(input <-chan Event, workers int) []<-chan Event {
    outputs := make([]chan Event, workers)
    for i := range outputs {
        outputs[i] = make(chan Event, 10) // buffer per worker
    }

    go func() {
        defer func() {
            for _, ch := range outputs {
                close(ch)
            }
        }()

        i := 0
        for event := range input {
            // Round-robin ke worker berikutnya
            outputs[i%workers] <- event
            i++
        }
    }()

    result := make([]<-chan Event, workers)
    for i, ch := range outputs {
        result[i] = ch
    }
    return result
}

Tantangan dan Kesalahan Umum #

Reactive programming memang powerful, tapi ada perangkap yang sering membuat engineer frustrasi.

Mixing blocking dan non-blocking code adalah kesalahan paling umum dan paling merusak. Satu blocking call di dalam reactive pipeline menghancurkan manfaat non-blocking yang sudah dibangun.

// ANTI-PATTERN: blocking call di dalam goroutine reactive pipeline
func processStream(events <-chan Event) <-chan Result {
    results := make(chan Result)
    go func() {
        for event := range events {
            // time.Sleep di sini memblokir goroutine ini
            // semua event berikutnya harus menunggu
            time.Sleep(100 * time.Millisecond) // ← JANGAN
            result := heavyComputation(event)
            results <- result
        }
    }()
    return results
}

// BENAR: gunakan worker pool untuk operasi yang membutuhkan waktu
func processStreamConcurrent(events <-chan Event, workers int) <-chan Result {
    results := make(chan Result, workers)
    var wg sync.WaitGroup

    for i := 0; i < workers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for event := range events {
                // Setiap worker memproses independen — tidak saling memblokir
                result := heavyComputation(event)
                results <- result
            }
        }()
    }

    go func() {
        wg.Wait()
        close(results)
    }()

    return results
}
Debugging reactive code jauh lebih sulit dari kode sequential. Stack trace tidak linear, error bisa terjadi di goroutine yang berbeda dari tempat pipeline didefinisikan, dan urutan event tidak deterministik. Investasikan waktu untuk structured logging dengan correlation ID dan distributed tracing sebelum masalah muncul, bukan sesudah.

Over-engineering sistem sederhana adalah kesalahan kedua yang sering terjadi. Reactive programming membawa kompleksitas yang signifikan — jika sistem kamu adalah CRUD API dengan traffic moderat, biaya kompleksitas ini tidak sebanding dengan manfaatnya.


Kapan Menggunakan dan Kapan Tidak #

GUNAKAN Reactive Programming jika:
  ✓ High concurrency — ribuan koneksi simultan
  ✓ I/O heavy — banyak call ke DB, API eksternal, file
  ✓ Event-driven — message queue, Kafka, real-time pipeline
  ✓ Real-time requirement — chat, live dashboard, notifikasi push
  ✓ Producer jauh lebih cepat dari consumer (backpressure critical)
  ✓ Stream data yang tidak pernah selesai (sensor, log, analytics)

JANGAN gunakan Reactive Programming jika:
  ✗ CRUD API sederhana dengan traffic moderat
  ✗ CPU-heavy computation (image processing, ML training) — reactive tidak mengurangi CPU cost
  ✗ Tim belum familiar — tanpa pemahaman cukup, bug sulit dilacak
  ✗ Sistem kecil dengan umur pendek — complexity tidak sebanding
  ✗ Deadline ketat — learning curve reactive cukup tinggi
Rule of thumb yang berguna: tanyakan dulu apakah masalahnya adalah concurrency dan throughput, atau sekadar keterbacaan kode asynchronous. Jika jawabannya concurrency dan throughput — pertimbangkan reactive. Jika jawabannya keterbacaan — async/await sudah cukup.

Anti-Pattern yang Harus Dihindari #

// ✗ Reactive setengah-setengah — HTTP non-blocking tapi DB masih blocking
// Spring WebFlux controller tapi pakai JPA/JDBC biasa
@GetMapping("/orders")
fun getOrders(): Flux<Order> {
    return Flux.fromIterable(orderRepository.findAll()) // ← JDBC blocking call
    // Ini lebih buruk dari full blocking — blocking di event loop thread
}
// ✓ Gunakan R2DBC atau reactive DB driver untuk end-to-end non-blocking

// ✗ Subscribe di dalam subscribe (nested subscribe)
orderStream.subscribe { order ->
    paymentService.processStream(order).subscribe { payment -> // ← anti-pattern
        notifyStream(payment).subscribe { ... }               // ← callback hell reactive
    }
}
// ✓ Gunakan operator chaining (flatMap, switchMap) — lebih bersih dan bisa dikontrol

// ✗ Tidak menutup stream — memory leak
val subscription = dataStream.subscribe { ... }
// lupa subscription.dispose() / cancel() saat komponen di-destroy
// ✓ Selalu manage lifecycle subscription — dispose saat tidak diperlukan

// ✗ Error handling diabaikan — stream berhenti diam-diam saat error
stream.map { transform(it) } // transform bisa throw, stream langsung mati
// ✓ Tangani error secara eksplisit di setiap stage
stream
    .map { transform(it) }
    .onErrorReturn(defaultValue)     // fallback
    .retry(3)                        // atau retry
    .doOnError { log.error(it) }     // atau log dan propagate

Ringkasan #

  • Reactive Programming adalah paradigma berbasis data stream dan push model — sistem bereaksi terhadap data yang datang, bukan meminta data secara aktif.
  • Lima prinsip inti: everything is a stream, asynchronous non-blocking, push-based model, backpressure, dan error sebagai bagian dari stream.
  • Backpressure adalah keunggulan utama reactive dibanding async biasa — consumer bisa memberitahu producer untuk melambat, mencegah memory overflow.
  • Reactive Manifesto mendefinisikan sistem yang responsive, resilient, elastic, dan message-driven — reactive programming adalah enabler untuk keempatnya.
  • Reactive ≠ async/await: async/await menyelesaikan masalah keterbacaan kode; reactive menyelesaikan masalah level sistem — concurrency ekstrem dan streaming berkelanjutan.
  • End-to-end non-blocking adalah syarat mutlak — satu blocking call di tengah pipeline merusak seluruh manfaat reactive.
  • Backpressure harus didesain, bukan ditambahkan belakangan — tentukan strategi buffer, drop, atau wait sejak awal.
  • Observability wajib: structured logging dengan correlation ID dan distributed tracing karena stack trace reactive tidak linear.
  • Jangan over-engineer: reactive membawa kompleksitas nyata — gunakan hanya jika masalahnya adalah concurrency, throughput, atau stream yang berkelanjutan.

← Sebelumnya: Replay Strategy   Berikutnya: Async Processing →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact