Streaming and Filtering Data

This article was written for Iguana 5 so it may contain out of date references.

Iguana 5.0.5 and up offers several built-in data filters. You can encode or decode data in base-64 or base-16 (hex), encode/decode URI component text, and encode text for HTML documents. You can also compress and decompress data in GZip or BZip2 stream formats, or encrypt and decrypt data with AES.

For previous versions of Iguana, we have Lua modules for base64 encoding and hex encoding.

These filters are available in the new built-in “filter” module. The following table outlines the available filters. The “s” parameter of each function can be a simple string or a stream as described in a later section.

Filter Namespace Encoder Decoder
Base64 filter.base64 enc(s) dec(s)
Hex (base 16) filter.hex enc(s) dec(s)
HTML Text filter.html enc(s) N/A
URI Component filter.uri enc(s) dec(s)
GZip filter.gzip deflate(s) inflate(s)
BZip2 filter.bzip2 deflate(s) inflate(s)
AES filter.aes enc{data=s,key=key} dec{data=s,key=key}
Unix to Unix [1] filter.uuencoding enc(s, filename, [mode]) dec(s)

[1] – Unix to Unix was added in Iguana 5.0.11.

Each filter can operate on strings or streams. When used on strings, each function returns its input filtered as a string. With streams, each return a new stream that, when used as described below, will return the filtered result.

function main(Data)
  local Decoded = filter.base64.dec(Data)
  queue.push{data=Decoded}
end

Chaining Filters

Since filters return exactly the sort of thing they expect as input (strings or streams), they can be easily chained. For example, suppose you wanted to encrypt the contents of your HL7 messages before sending them out. The following snippet creates new messages with the same MSH and EVN segments as the original, and packages the entire original message into a new AES segment. The message is first encrypted with a shared key using AES, then base64 encoded.

-- AES Key, known to both parties.
local Key = '0123456789ABCDEF'

function main(Data)
   local Msg = hl7.parse{data=Data,vmd='generic.vmd'}
   local AesEncrypted = filter.aes.enc{data=Data,key=Key}
   local Base64Encoded = filter.base64.enc(AesEncrypted)
   local Out = hl7.message{vmd='aes_encoded.vmd',name='Encoded'}
   Out.MSH = Msg.MSH
   Out.EVN = Msg.EVN
   Out.AES[1] = util.md5(Base64Encoded)
   Out.AES[2] = Base64Encoded
   queue.push{data=Out:S()}
end

The recipient need only have a copy of the shared key to recreate the original message, even if the MSH and EVN segments change.

Note: The “aes_encoded.vmd” is not special, it just defines a message with an AES segment; you can always encode data into any segment you like, however you like.

-- AES key, known to both parties.
local Key = '0123456789ABCDEF'

function main(Data)
   local Msg = hl7.parse{data=Data,vmd='aes_encoded.vmd'}
   local Base64Encoded = Msg.AES[2]:S()
   if Msg.AES[1]:S() ~= util.md5(Base64Encoded) then
      error('MD5 checksum does not match')
   end
   local AesEncrypted = filter.base64.dec(Base64Encoded)
   local Out = filter.aes.dec{data=AesEncrypted,key=Key}
   Out = Out:gsub('%z+$','') -- Remove NULs added by AES
   queue.push{data=Out}
end

Streams

Streams reduce the amount of memory required to process large amounts of data. When dealing with only small amounts, they are unnecessary. The streams defined here apply mainly to the filter API described above.

The “stream” module below can be used to read and write files (or strings) in chunks, reducing the amount of memory required for processing. Without a stream, a 64MB file would require at least that much memory for processing, usually more. With streams, files of any size can be processed in small chunks—the module uses 64kB chunks.

An Example: Reading Compressed Files

The function filter.bzip.inflate() can be used to decompress BZip compressed streams as well as strings.

function main()
   local FileStream = stream.fromFile('input.gzip')
   local UnzipStream = filter.gzip.inflate(FileStream)
   local Data = stream.toString(UnzipStream)
   queue.push{data=Data}
end

Notice how we used stream.fromFile() to create a stream? We passed it into filter.bzip.inflate() instead of a string, and it gave us another stream back. Streams themselves don’t do anything until we put them somewhere like a file, or create a string from them with stream.toString().

Once we call stream.toString(), the data starts to flow from the file. It first passes through the BZip filter, where it is uncompressed, and then it is returned as a string. We could also have written the uncompressed data back to disk with stream.toFile().

Available Functions

Function Description
stream.fromString(string) Creates a stream from the string.
stream.fromFile(path [,mode]) Creates a stream from the file located at path.
stream.fromPipe(cmd) Starts cmd in a separate process and returns its output as a stream (see io.popen).
stream.fromSocket(sock) Creates a stream to read from an open socket (see net.tcp.connect).
stream.toString(stream, …) Reads and returns, as a string, all the data from stream.
stream.toFile(path, [mode], stream, …) Writes to the file, at path, all the data from stream.
stream.toPipe(cmd, stream, …) Starts cmd in a separate process with all the data from stream as its input (see io.popen).
stream.toSocket(sock, stream, …) Writes to the open socket, sock, all the data from the stream (see net.tcp.connect).
stream.filter(stream, filter) Advanced: Creates a new stream by applying a filter to the chunked output of stream (see the next section for details).

The “mode” in both stream.fromFile() and stream.toFile() describes how the file should be opened (using io.open). The “…” in the stream.toString() and stream.toFile() functions are extra parameters that are passed to the reader at the other end of the stream. What that means is explained next.

How It Works

It’s not magic, really! Streams are just functions. To read from a stream, just call the function. Each time it is called, it will return a (possibly empty) chunk of data (as a string) or nil when there’s nothing left—once it returns nil, subsequent calls will always return nil. This is an important convention that you must conform to if you decide to write your own stream functions.

When you call stream.toString() or stream.toFile(), you can supply extra arguments. These are passed along upstream to the reader—readers are typically created by stream.fromString() or stream.fromFile(). The readers in the “stream” module to not require any arguments, but your custom readers can take advantage of this.

In addition to creating custom readers, you can create custom filters using stream.filter(). stream.filter() creates a new stream function from an existing stream and a filter function you supply. The filter function will be passed the chunk (or nil) read from the stream and any extra arguments that were passed upstream. It must return the chunk (or nil) to send downstream.

The “stream” Module

-- 
-- Basic Streams: Strings, Files, Pipes and Sockets
--
-- Copyright (c) 2011-2012 iNTERFACEWARE Inc.
--

-- How much data to buffer between reads.
local buffer_size = 64*1024

-- Throw errors reported by io.open().
local function open(path, mode)
   local file, err = io.open(path, mode)
   if not file then
      error(err, 3)
   end
   return file
end

-- Throw errors reported by io.popen().
local function popen(cmd, mode)
   local file, err = io.popen(cmd, mode)
   if not file then
      error(err, 3)
   end
   return file
end

-- Stream from some open file (see fromFile, fromPipe).
local function fromFile(file)
   return function()
      local out
      if file then
         out = file:read(buffer_size)
         if not out then
            file:close()
            file = nil
         end
      end
      return out
   end
end

-- Stream to some open file (see toFile, toPipe).
local function toFile(file, from, ...)
   local chunk
   repeat
      chunk = from(...)
      if chunk then
         file:write(chunk)
      end
   until not chunk
   file:close()
end

--
-- Public API
--

stream = {}

-- stream.fromString(s)
--
-- Create a stream from a string.
--   's' - the string
--
-- e.g. stream.toFile('out.txt', stream.fromString(Data))
--
function stream.fromString(s)
   return function()
      local out
      if #s > 0 then
         out = s:sub(1,buffer_size)
         s = s:sub(buffer_size+1)
      end
      return out
   end
end

-- stream.toString(from, ...)
--
-- Write a stream to a string.
--   'from(...)' - the stream to read from
--
-- e.g. local s = stream.toString(stream.fromFile('in.txt'))
--
function stream.toString(from, ...)
   local out, chunk = {}, nil
   repeat
      chunk = from(...)
      if chunk then
         out[#out+1] = chunk
      end
   until not chunk
   return table.concat(out)
end

-- stream.fromFile(path [,mode])
--
-- Create a stream from a file.
--   'path' - the path of the file
--   'mode' - the mode to use (defaults to 'rb')
--
-- e.g. local s = stream.toString(stream.fromFile('in.txt'))
--
function stream.fromFile(path, mode)
   local file = open(path, mode or 'rb')
   return fromFile(file)
end

-- stream.toFile(path, from, ...)
-- stream.toFile(path, mode, from, ...)
--
-- Write a stream to a file.
--   'path'      - the path of the file
--   'mode'      - the mode to use (defaults to 'wb')
--   'from(...)' - the stream to read from
--
-- e.g. stream.toFile('out.txt', stream.fromString(Data))
--
function stream.toFile(path, mode, from, ...)
   if type(mode) == 'function' then
      return stream.toFile(path, 'wb', mode, from, ...)
   end
   local file = open(path, mode)
   return toFile(file, from, ...)
end

-- stream.fromPipe(cmd)
--
-- Create a stream from an external process.
--   'cmd' - the command to run and read from
--
-- e.g. local s = stream.toString(stream.fromPipe('ls -1'))
--
function stream.fromPipe(cmd)
   local file = popen(cmd, 'r')
   return fromFile(file)
end

-- stream.toPipe(cmd, from, ...)
--
-- Write a stream to an external process.
--   'cmd'       - the command to run and write to
--   'from(...)' - the stream to read from
--
-- e.g. stream.toPipe('openssl des -out out.tmp -k '..Key,
--                    stream.fromString(Data))
--
function stream.toPipe(cmd, from, ...)
   local file = popen(cmd, 'w')
   return toFile(file, from, ...)
end

-- stream.fromSocket(sock)
--
-- Create a stream from a TCP/IP connection.
--   'sock' - the connection to read from
--
-- e.g. local s = net.tcp.connect{...}
--      stream.toFile('big.hl7', stream.fromSocket(s))
--
function stream.fromSocket(sock)
   return function()
      return sock:recv()
   end
end

-- stream.toSocket(sock, from, ...)
--
-- Write a stream to a TCP/IP connection.
--   'sock'      - the connection to write to
--   'from(...)' - the stream to read from
--
-- e.g. local s = net.tcp.connect{...}
--      stream.toSocket(s, stream.fromFile('big.hl7'))
--
function stream.toSocket(sock, from, ...)
   while true do
      local chunk = from(...)
      if not chunk then break end
      sock:send(chunk)
   end
end

-- stream.filter(from, f)
--
-- Create a stream by attaching a filter to another stream.
--   'from' - the stream to read from
--   'f'    - the filter function
--
-- The filter (f) is called with each chunk (or nil) read
-- from the stream (from) and must return chunks (or nil)
-- to be sent downstream.
--
-- e.g. local Out = stream.toString(
--         stream.filter(stream.fromString(Data),
--             function(s)
--                return s and s:upper()
--             end))
--      assert(Out == Data:upper())
--
function stream.filter(from, f)
   return function(...)
      local out = from(...)
      return f(out, ...)
   end
end

Leave A Comment?