Efficient Parsing of Reactive Buffer Streams
It has been a while since Spring Framework 5.3 was released. One of the features in that release was a major overhaul of our Reactive Multipart support. In this blog post, we share some of the knowledge learned while working on this feature. Specifically, we focus on finding a token within a stream of byte buffers.
Multipart Form Data
Whenever you upload a file, your browser sends it — and other fields in the form — to the server as a multipart/form-data
message. The exact format of these messages is described in RFC 7578. If you submit a simple form with a single text field called foo
and a file selector called file
, the multipart/form-data
message looks something like this:
POST / HTTP/1.1
Host: example.com
Content-Type: multipart/form-data;boundary="boundary" (1)
--boundary (2)
Content-Disposition: form-data; name="foo" (3)
bar
--boundary (4)
Content-Disposition: form-data; name="file"; filename="lorum.txt" (5)
Content-Type: text/plain
Lorem ipsum dolor sit amet, consectetur adipiscing elit. Integer iaculis metus id vestibulum nullam.
--boundary-- (6)
-
The
Content-Type
header of the message contains theboundary
parameter. -
The boundary is used to start the first part. It is preceded by
--
. -
The first part contains the value of the text field,
foo
, as can be seen in the part headers. The value of the field isbar
. -
The boundary is used to separate between the first and second part. Again, it is preceded by
--
. -
The second part contains the contents of the submitted file, named
lorum.txt
. -
The end of the message is indicated by the boundary. It is preceded and followed by
--
.
Finding the Boundaries
The boundary in a multipart/form-data
message is quite important. It is specified as a parameter of the Content-Type
header. When preceded by two hyphens (--
), the boundary indicates the beginning of a new part. When also followed by --
, the boundary indicates the end of the message.
Finding the boundary in the stream of incoming byte buffers is key when parsing multipart messages. Doing so seems simple enough:
private int indexOf(DataBuffer source, byte[] target) {
int max = source.readableByteCount() - target.length + 1;
for (int i = 0; i < max; i++) {
boolean found = true;
for (int j = 0; j < target.length; j++) {
if (source.getByte(i + j) != target[j]) {
found = false;
break;
}
}
if (found) {
return i;
}
}
return -1;
}
However, there is a complication:The boundary can be split across two buffers, which — in a Reactive environment — might not arrive at the same time. For example, given the sample multipart message shown earlier, the first buffer might contain the following:
POST / HTTP/1.1
Host: example.com
Content-Type: multipart/form-data;boundary="boundary"
--boundary
Content-Disposition: form-data; name="foo"
bar
--bou
While the next buffer contains the remainder:
ndary
Content-Disposition: form-data; name="file"; filename="lorum.txt"
Content-Type: text/plain
Lorem ipsum dolor sit amet, consectetur adipiscing elit. Integer iaculis metus id vestibulum nullam.
--boundary--
If we inspect one buffer at the time, we can not find split boundaries like these. Instead, we need to find the boundary across multiple buffers.
One way to solve this problem would be to wait until all buffers have been received, join them, and locate the boundaries afterwards. The following example does so, using a sample stream and the indexOf
method defined earlier:
Flux<DataBuffer> stream = Flux.just("foo", "bar", "--boun", "dary", "baz")
.map(s -> factory.wrap(s.getBytes(UTF_8)));
byte[] boundary = "--boundary".getBytes(UTF_8);
Mono<Integer> result = DataBufferUtils.join(stream)
.map(joined -> indexOf(joined, boundary));
StepVerifier.create(result)
.expectNext(6)
.verifyComplete();
Using Reactor’s StepVerifier
, we see that the boundary starts at index 6.
There is one major downside to this approach: joining multiple buffers into one effectively stores the entire multipart message in memory. Multipart messages are primarily used to upload (large) files, so this is not a viable option. Instead, we need a smarter way to locate the boundary.
Knuth to the Rescue!
Luckily, such a way exists in the form of the Knuth–Morris–Pratt algorithm. The main idea behind this algorithm is that if we already matched several bytes of the boundary but the next byte does not match, we do not need to restart the from the beginning. To do so, the algorithm maintains state, in the form of a position in a precomputed table that contains the number of bytes that can be skipped after a mismatch.
In Spring Framework, we have implemented the Knuth-Morris-Pratt algorithm in the Matcher
interface, which you can obtain an instance of through DataBufferUtils::matcher
. You can also check out the source code.
Here, we use the Matcher
to give us the end indices of boundary
in stream
, using the same sample input as earlier:
Flux<DataBuffer> stream = Flux.just("foo", "bar", "--boun", "dary", "baz")
.map(s -> factory.wrap(s.getBytes(UTF_8)));
byte[] boundary = "--boundary".getBytes(UTF_8);
DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(boundary);
Flux<Integer> result = stream.map(matcher::match);
StepVerifier.create(result)
.expectNext(-1)
.expectNext(-1)
.expectNext(-1)
.expectNext(3)
.expectNext(-1)
.verifyComplete();
Note that the Knuth-Morris-Pratt algorithm gives the end index of the boundary, which explains the test results: the boundary does not end until index 3 in the second-to-last buffer.
As can be expected, Spring Framework’s MultipartParser
makes heavy use of Matcher
, for
-
Finding the first boundary by looking for the boundary prefixed by
--
. -
Finding subsequent boundaries by looking for the boundary prefixed by
CRLF
(end of previous part) and--
. -
Finding the
CRLF
CRLF
separator between the part headers and part body.
If you need to find a series of bytes in a stream of byte buffers, give the Matcher
a try!