Latency numbers

L1 cache reference ......................... 0.5 ns
Branch mispredict ............................ 5 ns
L2 cache reference ........................... 7 ns
Mutex lock/unlock ........................... 25 ns
Main memory reference ...................... 100 ns             
Compress 1K bytes with Zippy ............. 3,000 ns  =   3 µs
Send 2K bytes over 1 Gbps network ....... 20,000 ns  =  20 µs
SSD random read ........................ 150,000 ns  = 150 µs
Read 1 MB sequentially from memory ..... 250,000 ns  = 250 µs
Round trip within same datacenter ...... 500,000 ns  = 0.5 ms
Read 1 MB sequentially from SSD* ..... 1,000,000 ns  =   1 ms
Disk seek ........................... 10,000,000 ns  =  10 ms
Read 1 MB sequentially from disk .... 20,000,000 ns  =  20 ms
Send packet CA->Netherlands->CA .... 150,000,000 ns  = 150 ms

(source: Peter Norvig)

By type

L1 cache reference ......................... 0.5 ns               Memory
Branch mispredict ............................ 5 ns
L2 cache reference ........................... 7 ns
Mutex lock/unlock ........................... 25 ns
Main memory reference ...................... 100 ns             
Read 1 MB sequentially from memory ..... 250,000 ns  = 250 µs
Compress 1K bytes with Zippy ............. 3,000 ns  =   3 µs     Disk
SSD random read ........................ 150,000 ns  = 150 µs
Read 1 MB sequentially from memory ..... 250,000 ns  = 250 µs
Read 1 MB sequentially from SSD* ..... 1,000,000 ns  =   1 ms
Disk seek ........................... 10,000,000 ns  =  10 ms
Read 1 MB sequentially from disk .... 20,000,000 ns  =  20 ms
Send 2K bytes over 1 Gbps network ....... 20,000 ns  =  20 µs     Network
Round trip within same datacenter ...... 500,000 ns  = 0.5 ms
Send packet CA->Netherlands->CA .... 150,000,000 ns  = 150 ms

Humanized latency numbers

Lets multiply all these durations by a billion!

L1 cache reference ......................... 0.5 s
Branch mispredict ............................ 5 s
L2 cache reference ........................... 7 s
Mutex lock/unlock .......................... 0.5 min
Main memory reference ...................... 1.5 min             
Compress 1K bytes with Zippy ................ 50 min
Send 2K bytes over 1 Gbps network .......... 5.5 hr
SSD random read ............................ 1.7 days
Read 1 MB sequentially from memory ......... 2.9 days
Round trip within same datacenter .......... 5.8 days
Read 1 MB sequentially from SSD* .......... 11.6 days
Disk seek ................................. 16.5 weeks
Read 1 MB sequentially from disk ........... 7.8 months
Send packet CA->Netherlands->CA ............ 4.8 years

(source: P. Stark)

Minute

L1 cache reference 0.5 s
One heart beat

Branch mispredict 5 s
Yawn

L2 cache reference 7 s
Long yawn

Mutex lock/unlock 25 s
Making a coffee (push-button)

Hour

Main memory reference 100 s
Brushing your teeth

Compress 1K bytes with Zippy 50 min
One episode of a TV show (including ad breaks)

Day

Send 2K bytes over 1 Gbps network 5.5 hr
From lunch to end of work day

Week

SSD random read 1.7 days
A normal weekend

Read 1 MB sequentially from memory 2.9 days
A long weekend

Round trip within same datacenter 5.8 days
A medium vacation

Read 1 MB sequentially from SSD 11.6 days
Waiting for almost 2 weeks for a delivery

Year

Disk seek 16.5 weeks
A semester in university

Read 1 MB sequentially from disk 7.8 months
Almost producing a new human being

The above 2 together 1 year

Decade

Send packet CA->Netherlands->CA 4.8 years
Average time it takes to complete a bachelor's degree

Summary (humanized)

L1 cache reference ......................... 0.5 s
Branch mispredict ............................ 5 s
L2 cache reference ........................... 7 s
Mutex lock/unlock .......................... 0.5 min
Main memory reference ...................... 1.5 min             
Compress 1K bytes with Zippy ................ 50 min
Send 2K bytes over 1 Gbps network .......... 5.5 hr
SSD random read ............................ 1.7 days
Read 1 MB sequentially from memory ......... 2.9 days
Round trip within same datacenter .......... 5.8 days
Read 1 MB sequentially from SSD* .......... 11.6 days
Disk seek ................................. 16.5 weeks
Read 1 MB sequentially from disk ........... 7.8 months
Send packet CA->Netherlands->CA ............ 4.8 years

Summary by type (humanized)

L1 cache reference ......................... 0.5 s        Memory
Branch mispredict ............................ 5 s
L2 cache reference ........................... 7 s
Mutex lock/unlock .......................... 0.5 min
Main memory reference ...................... 1.5 min             
Read 1 MB sequentially from memory ......... 2.9 days
Compress 1K bytes with Zippy ................ 50 min      Disk
SSD random read ............................ 1.7 days
Read 1 MB sequentially from SSD* .......... 11.6 days
Disk seek ................................. 16.5 weeks
Read 1 MB sequentially from disk ........... 7.8 months
Send 2K bytes over 1 Gbps network .......... 5.5 hr       Network
Round trip within same datacenter .......... 5.8 days
Send packet CA->Netherlands->CA ............ 4.8 years

Hadoop System Design

Hadoop: disk + network

Compress 1K bytes with Zippy ................ 50 min      Disk
SSD random read ............................ 1.7 days
Read 1 MB sequentially from SSD* .......... 11.6 days
Disk seek ................................. 16.5 weeks
Read 1 MB sequentially from disk ........... 7.8 months
Send 2K bytes over 1 Gbps network .......... 5.5 hr       Memory
Round trip within same datacenter .......... 5.8 days
Send packet CA->Netherlands->CA ............ 4.8 years

Spark System Design

Spark: memory + network

L1 cache reference ......................... 0.5 s        Memory
Branch mispredict ............................ 5 s
L2 cache reference ........................... 7 s
Mutex lock/unlock .......................... 0.5 min
Main memory reference ...................... 1.5 min             
Read 1 MB sequentially from memory ......... 2.9 days
Send 2K bytes over 1 Gbps network .......... 5.5 hr       Network
Round trip within same datacenter .......... 5.8 days
Send packet CA->Netherlands->CA ............ 4.8 years

How to limit latency?

size of data
persistence
partitioning

Persistence

Storage Level Store as
MEMORY_ONLY Deserialized in JVM, recompute on spill.
MEMORY_AND_DISK Deserialized in JVM, spill to disk.
MEMORY_ONLY_SER Serialized in JVM.
MEMORY_AND_DISK_SER Like MEMORY_ONLY_SER, spill to disk.
DISK_ONLY Serialized to disk.
MEMORY_ONLY_2, … Same as above, replicate partitions on 2 nodes each.
OFF_HEAP (experimental) Deserialized off-heap.

Example

val words = sc
  .textFile("input/books/11.txt.utf-8")
  .flatMap { 
    _ .filter(c => c.isLetter || c.isWhitespace)
      .toUpperCase.split(' ') 
  }
  .filter { !_.isEmpty }

  
val wordCount = words.countByValue.toSeq
val wordScore = words
  .distinct
  .map {
    word =>
      word -> word.map {scores.getOrElse(_, 0)}.reduce {_ + _}
  }
  .collect

Example

val words = sc
  .textFile("input/books/11.txt.utf-8")
  .flatMap { 
    _ .filter(c => c.isLetter || c.isWhitespace)
      .toUpperCase.split(' ') 
  }
  .filter { !_.isEmpty }
  .cache
  
val wordCount = words.countByValue.toSeq
val wordScore = words
  .distinct
  .map {
    word =>
      word -> word.map {scores.getOrElse(_, 0)}.reduce {_ + _}
  }
  .collect

Example

val words = sc
  .textFile("input/books/11.txt.utf-8")
  .flatMap { 
    _ .filter(c => c.isLetter || c.isWhitespace)
      .toUpperCase.split(' ') 
  }
  .filter { !_.isEmpty }
  .persist(StorageLevel.DISK_ONLY)

val wordCount = words.countByValue.toSeq
val wordScore = words
  .distinct
  .map {
    word =>
      word -> word.map {scores.getOrElse(_, 0)}.reduce {_ + _}
  }
  .collect

Unpersisting

unpersist
LRU

Partitioning

HashPartitioner
RangePartitioner
custom

Partitioning

val list = sc.parallelize(
  List(8 -> "Kirk", 96 -> "Picard", 240 -> "Sisko", 400 -> "Janeway", 
      401 -> "Archer", 800 -> "Lorca"))

4 partitions
hash: _._1

Partitioning

val list = sc.parallelize(
  List(8 -> "Kirk", 96 -> "Picard", 240 -> "Sisko", 400 -> "Janeway", 
      401 -> "Archer", 800 -> "Lorca"))

4 partitions
hash: _._1

parition 0: [ 8, 96, 240, 400, 800 ]
parition 1: [ 401 ]
parition 2: [ ]
parition 3: [ ]

Partitioning

val list = sc.parallelize(
  List(8 -> "Kirk", 96 -> "Picard", 240 -> "Sisko", 400 -> "Janeway", 
      401 -> "Archer", 800 -> "Lorca"))
val part = new RangePartitioner(4, list)
val pList = list.partitionBy(part)

Partitioning

val list = sc.parallelize(
  List(8 -> "Kirk", 96 -> "Picard", 240 -> "Sisko", 400 -> "Janeway", 
      401 -> "Archer", 800 -> "Lorca"))
val part = new RangePartitioner(4, list)
val pList = list.partitionBy(part)
parition 0: [ 8, 96, ]
parition 1: [ 240, 400 ]
parition 2: [ 401 ]
parition 3: [ 800 ]