hjf.io

Taming the Beast: Efficiently Processing Large S3 Objects with Node.js Lambda

26th Feb - 2024

I wrote a while back on dealing with a large dataset in Node. I failed pretty badly at first, but managed to made my software more memory efficient. I’ve actually stumbled on the same error while building some software for AWS Lambda.

Corporate software engineering is a weird beast. It’s a strange mix of poor requirements, strict(-ish) design patterns and even stricter language choices.

In my case, I had to read an object from S3 (about 800,000 lines), parse it and send a few hundred thousand messages to AWS SQS.

Naturally, you’d go for a slightly naive approach:

import { S3Client, GetObjectCommand } from '@aws-sdk/client-s3'

export const handler = async (event) => {
	const client = new S3Client({ region: process.env.AWS_REGION })
	
	const record = event.Record[0]
	
	const bucket = record.s3.bucket.name
	const { key } = record.s3.object
	
	const response = await client.send(
		new GetObjectCommand({ bucket, key }),
	)
	
	const file = await response.Body.transformToString()
	
	await doSomethingWithFile(file)
	
	return {
		statusCode: 201,
		body: 'Created',
	}
}

But with this, the lambda falls over. It runs out of memory. Enter streaming. If you’re unfamiliar with streaming, the concept is: You ask the service (S3, the filesystem etc.) to push chunks of the file to your software. You process what comes in, without filling your memory like a hungry Slack instance.

Streaming Files

Streaming files in to a lambda has a few nuances. I’ll get in to those and I’ll illustrate those with some software. Your next, less naive, approach could look something like this:

const DELIM = '\n'
export const handler = async (event) => {
	// request code omitted for brevity
	
	const response = await client.send(command)
	
	let buffer = ''
	
	response.Body.on('chunk', async (chunk) => {
		buffer += chunk.toString()
		const records = buffer.split(DELIM)
		buffer = records.pop()
		await doSomethingWithLines(records)
	})
	
	response.Body.on('end', async () => {
		if (buffer.length) 
			await doSomethingWithLines(records)
	})
	
	return {
		statusCode: 201,
		body: 'Created',
	}
}

Can you spot the issue here? I couldn’t, and our team couldn’t. We were stuck for a bit until realised: Body.on('event... attaches an event listener to the stream. It’s non-blocking and thus, the execution of the lambda looks like this:

  1. Get a handle on the stream

  2. Define a buffer

  3. Attach a listener for the chunk event

  4. Attach a listener for the end event

  5. Serve the client a HTTP 201


We need to block until we’ve finished processing the whole file. We can take a bit of inspiration from Go’s waitgroups. Waitgroups are useful when you spawn off threads off in Go and you need them to finish before finalising a part of your software’s execution.

This is pretty applicable here: although Node is single-threaded, we can pretend that the events on our event loop are threads. Let’s setup waitgroups for those “threads”.

Firstly, we need waitgroups. Let’s create a contrived class for it:

export class Waitgroup {
	constructor() {
		this.delta = 0
	}
	
	add() {
		this.delta += 1
	}
	
	done() {
		this.delta -= 1
	}
	
	wait() {
		return new Promise((res) => {
			const interval = setInterval(() => {
				if (this.delta !== 0) return
				clearInterval(interval)
				res(null)
			}, 100)
		})
	}
}

Cool. We can finally wait for our processing:

const DELIM = '\n'
export const handler = async (event) => {
	// request code omitted for brevity
	
	const response = await client.send(command)
	const wg = new Waitgroup()
	wg.add()
	
	let buffer = ''
	
	async function process(records) {
		wg.add()
		await doSomethingWithLines(records)
		wg.done()
	}
	
	response.Body.on('chunk', async (chunk) => {
		buffer += chunk.toString()
		const records = buffer.split(DELIM)
		buffer = records.pop()
		await process(records)
	})
	
	response.Body.on('end', async () => {
		if (buffer.length)
			await process(records)
	
		wg.done()
	})
	
	await wg.wait()
	
	return {
		statusCode: 201,
		body: 'Created',
	}
}

Every time we fire off a promise to process part of the file, we now wait for it. The execution of the lambda is held up with the wg.wait() polling function.

Summary

Overall, this is a nice pattern to use if you’re dealing with large files and have a memory limitation in Node. In our tests, we were able to process 800k records and send 800k messages in 3 minutes. Not bad for a lambda!

We could have changed a few things if we had more freedom however. Speaking of Golang, we could have changed our programming language. A POC I ran comparing this streaming/parsing in Node and Go saw some pretty substantial improvement in performance. Go was about 2500% faster when parsing files that were 800k lines long. Right now, support for Go in our environment isn’t there. I’m working on that, though.

It’s also possible that we could encounter a deadlock, and the lambda would run for its time limit: if doSomethingWithLines(...) throws an error, how can we tell the WaitGroup to not wait for this group of work? We ended up building a robust error handling strategy and extending the WaitGroup class to deal with exceptions.

We could have also used containers. We could have allocated more memory to the process and used some Node threading via the child_process library.

We could have just chunked the input files to something like smaller and thus easier to work, like 10k lines. This would let us take advantage of lambda concurrency. for some performant message-sending.

Ultimately, processing all of these files in a lambda is fast enough, and budget isn’t a concern. It fits our stack, so I can’t argue too much. It was a fun few days, at least. It’s fun to hack with contstraints! Especially if they’re weird. I’m a big fan of cheap cloud compute. Go ahead, hack away for yourself. Drop a comment and let me know how you’d approach this!

Resources