Enhancing Data Processing in Browsers Using Streams

September 2018

Konstantin Möllers
Baqend

$ whoami

Konstantin Möllers
Performance Engineer @ Baqend
Master of Science @ Uni Hamburg
Professional working experience for 6 years

Streams API

WHATWG Living Standard

https://streams.spec.whatwg.org/

Reading Data Writing Data Piping Data

Enabled Use Cases

  • Realtime data processing
  • Edge Side Includes
  • Video analysis and effects
  • Decompression and decryption
  • Image manipulating and decoding


Reading from Streams

Arbitrary Underlying Source
Reader
ReadableStream

A ReadableStream

  • is integrated into the Fetch API
  • has a Promise-based interface
  • has a mutually exclusive reader
  • can be teed into two streams

Consuming a ReadableStream from Fetch API


async function consumeStreamAsyncAwait(url) {
  const response = await fetch(url) // Fetch a URL
  const rs = response.body          // Stream the body
  const reader = rs.getReader()     // Get a lock on the stream

  while (true) {
    // Await more data and return if we are done reading
    const { done, value } = await reader.read()
    if (done) break

    // Do something with the data
    console.log(value)
  }

  // Unlock the stream
  reader.releaseLock()
}
						

Using AsyncIterator of ES2018


async function consumeStreamAsyncIterator(url) {
  const response = await fetch(url)         // Fetch a URL
  const rs = response.body                  // Stream the body
  const iterator = streamAsyncIterator(rs)  // Lock and create iterator

  // Await more data while iterating
  for await (const value of iterator) {
    // Do something with the data
    console.log(value)
  }
}
        

Implementation of streamAsyncIterator


async function* streamAsyncIterator(rs) {
  // Get a lock on the stream
  const reader = rs.getReader()
  try {
    while (true) {
      // Read from the stream and exit if we're done
      const { done, value } = await reader.read()
      if (done) return
      // Else yield the chunk
      yield value
    }
  }
  finally {
    // Unlock the stream
    reader.releaseLock()
  }
}
        

Creating Custom Readable Streams


interface ReadableStream {
  /* Is called when the consumer locks the stream */
  start(ctrl: ReadableStreamDefaultController): Promise<void>

  /* Is called when the consumer waits for data */
  pull(ctrl: ReadableStreamDefaultController): Promise<void>

  /* Handle back-propagated errors */
  cancel(reason: any): Promise<void>
}
      

Creating Custom Readable Streams


const rs = new ReadableStream({
  /* Is called when the consumer locks the stream */
  async start(ctrl: ReadableStreamDefaultController): Promise<void> {
    try {
      ctrl.enqueue(chunk1)  // Enqueue a new chunk in the stream
      ctrl.enqueue(chunk2)
      ctrl.enqueue(chunk3)

      /* ... */

      ctrl.close()         // Tell the consumer we have sent all chunks
    } catch (err) {
      ctrl.error(err)      // Forward-propagate an error
    }
  },
})
      
43+
16+
30+
10+

Writing to Streams

Arbitrary Underlying Sink
Writer
WritableStream

A WritableStream

  • is not natively integrated (as of today)
  • also has a Promise-based interface
  • has a mutually exclusive writer

Creating Custom Writable Streams


const ws = new WritableStream({
  /* Is called when the user starts writing to the stream */
  start(ctrl: WritableStreamDefaultController): Promise<void> {},

  /* Is called when the user sends a new chunk */
  write(chunk: any,
        ctrl: WritableStreamDefaultController): Promise<void> {},

  /* Is called when the user is done writing */
  close(ctrl: WritableStreamDefaultController): Promise<void> {},

  /* Is called when the user aborts to write */
  abort(reason: any): Promise<void> {},
})
        

Creating a Writable WebSocket Stream


function makeWritableWebSocketStream(url, protocols) {
  const socket = new WebSocket(url, protocols)  // Create the WebSocket
  return new WritableStream({
    start(ctrl) {   // Wait for the socket to be opened
      return new Promise(resolve => socket.onopen = resolve)
    },

    write(chunk) {  // Pass the chunk to the socket
      socket.send(chunk)
    },

    close() {       // Close the socket when closing the stream
      return new Promise((resolve) => {
        socket.onclose = () => resolve()
        socket.close(1000)
      })
    },
  })
}
        

Using pipeTo with Writable Streams


async function examplePipingTo() {
  // Create a writable stream to a WebSocket
  const ws = makeWritableWebSocketStream('ws://localhost:8080/')
  // Fetch a JSON
  const response = await fetch('https://localhost/some.json')

  // Pipe the JSON's stream to the WebSocket
  response.body.pipeTo(ws)
}
        
59+
16+
46+

Transforming Streams with Pipes

WritableStream
ReadableStream
Transformer
TransformStream

A TransformStream

  • will be integrated into the Encoder API
  • allows piping of readable streams
  • used to create a writable and readable stream pair

Creating Custom Transform Streams


const ts = new TransformStream({
  /* Is called when the user starts to transform */
  start(ctrl: TransformStreamDefaultController): Promise<void> {},

  /* Is called when a new chunk is sent to be transformed */
  transform(chunk: any,
            ctrl: TransformStreamDefaultController): Promise<void> {},

  /* Is called when all chunks have been sent and we have to finish */
  flush(ctrl: TransformStreamDefaultController): Promise<void> {},
})
        

Using Identity Transform Streams


function fetchUploadStreamed() {
  // Create an identity transform stream and get its
  // readable and writable stream pair
  const { writable, readable } = new TransformStream()

  // Link the uploaded body with the readable stream
  fetch('...', { body: readable }).then(response => /* ... */)

  // Lock the writer of the writable stream and write data to it
  const writer = writable.getWriter()
  writer.write(new Uint8Array([0x73, 0x74, 0x72, 0x65, /* ... */]))
  writer.close()
}
        

Using pipeThrough with Transform Streams


async function examplePipeThrough() {
  // Create a transform stream to parse Uint8Arrays to single bytes
  const transformStream = new TransformStream({
    transform(chunk, ctrl) {
      chunk.forEach(byte => ctrl.enqueue(byte))
    }
  })

  // Get a readable stream
  const response = await fetch('...')
  const readableStream = response.body

  // Pipe it through our transform stream
  const byteStream = readableStream.pipeThrough(transformStream)

  /* ... */
}
        
67+
54+

Outlook: Using a TextDecoderStream


async function exampleTextDecoding() {
  // Create a text decoder transform stream to decode Uint8Arrays
  // to UTF-8 strings
  const transformStream = new TextDecoderStream('utf-8')

  // Get a readable stream
  const response = await fetch('...')
  const readableStream = response.body

  // Pipe it through our transform stream
  const stringStream = readableStream.pipeThrough(transformStream)

  /* ... */
}
        
Will come with Chrome 71 and Opera 58

Demonstration

Polyfilling Targa (.tga) image format for browsers

Hosted Targa Image Transforming Service Worker Using Streams Client Rendering PNG

Service Workers:
Programmable proxies in the browser

github.com/ksm2/targa-service-worker

Who We Are

product

Our Product

Speed Kit:

  • Accelerates any website
  • Pluggable
  • Easy Setup

test.speed-kit.com

services

Our Services

  • Web & Data Management Consulting
  • Performance Auditing
  • Implementation Services

consulting@baqend.com

Our Other Talks

  • Coming up here at 15:15
    Vue.js in Echtzeit: Live-Coding einer Realtime-Anwendung
    by Hannes Kuhlmann
  • Yesterday 12:30
    The Technology Behind Progressive Web Apps
    by Erik Witt
  • Yesterday 15:45
    Real-Time Processing Explained: Surveying Storm, Samza, Spark & Flink

slides.baqend.com QR Code

Thanks for Watching!

Konstantin Möllers
ksm@baqend.com
@ksm2
@k_moellers

slides.baqend.com QR Code