Speeding up your code when multiple cores aren’t an option

The common advice when Python is too slow is to switch to a low-level compiled language like Cython or Rust. But what do you do if that code is too slow? At that point you might start thinking about parallelism: using multi-threading or multi-processing so you can take advantage of multiple CPU cores.

But parallelism comes with its own set of complexities; at the very least, some algorithms can only really work in a single-threaded way. So what can you do?

As it turns out, there’s often still plenty of performance improvements you can get just by tweaking your low-level code. As a real-world example, in this article we’ll go about optimizing Floyd-Steinberg error diffusion dithering. The specific variant of the algorithm that we will implement converts a grayscale image with values of 0 to 255 into an image with just two colors, black and white.

Because of the specifics of this algorithm, it’s quite difficult or perhaps even impossible to parallelize with threading. But you can still make it run faster.

This article is an excerpt from a book I’m working on that will help teach you how to optimize low-level code, the kind of code you’d write with C, Cython, or Rust. The goal is to help data scientists and scientists who normally write Python to understand how to make their compiled code faster.

The Floyd-Steinberg dithering algorithm

As explained in the Wikipedia article, we can use this algorithm to convert a grayscale image into just two colors, black and white. The algorithm rounds a pixel’s value to the nearest of the two extremes (0 for black or 255 for white). The difference between the original value and the rounded value, known as the error, is added to neighboring pixels, with the following distribution:

[ ...            , current pixel (rounded)   , + 7/16 of error, ... ]
[ + 3/16 of error, + 5/16 of error           , + 1/16 of error, ... ]

So the next pixel on the row gets 7/16th of the error, and the pixel one row down gets 5/16th of the error, and so on. Once the current pixel is processed, the algorithm moves on to the next pixel, which now includes some of the error from the previous pixel.

One key issue with optimizing this algorithm is that it’s likely impossible to process pixels in parallel: each pixel’s final value is impacted by the calculations done on previous pixels. This suggests that using multiple threads for parallelism might be difficult or impossible; using SIMD may also be difficult (see below for a quick explanation of SIMD). Still, given a naive implementation there are some ways to speed things up.

Let’s load the libraries we’ll be using, as well as a test image, a 400×400 NumPy array of uint8s.

Like the book, the examples in this article all use Numba, so you only need to understand Python syntax. If you decorate a function with @numba.njit, Numba will compile it into low-level machine code, at runtime. Even if you’re not using Numba, the same basic optimizations techniques and concepts will apply to C++, Rust, or other low-level compiled languages.

from numba import njit
import numpy as np
from skimage import io

image = io.imread("images/hallway.jpg")

Here’s what it looks like:

The original image

If this was more than an example, we’d want to benchmark the code with a variety of images and sizes, matching the variety of inputs we expect to encounter. For simplicity’s sake, however, we’ll stick to this single image.

In the following section you’ll follow along as I start with a naive implementation, make it faster, reduce memory usage, and then optimize it some more. Some intermediate steps and failed experiments were omitted for clarity. Nonetheless, this is a real optimization exercise: these are all optimization ideas I came up with as I went along, as I had never optimized this algorithm before.

A naive implementation

Here’s the first version I implemented. I didn’t try to make it slow or fast: I just tried to implement the algorithm.

The code stores temporary results in 16-bit integers, because adding the error might make some pixels either negative or bigger than 255. Both those cases won’t fit in an unsigned 8-bit integer. At the end I turn the result into an 8-bit image, which is what the function is supposed to return.

# Code that is decorated with numba.njit looks like Python code, but is
# actually compiled into machine code, at runtime. This is fast, low-level
# code!
@njit
def dither(img):
    # Allow negative values and wider range than a uint8 has:
    result = img.astype(np.int16)
    y_size = img.shape[0]
    x_size = img.shape[1]
    last_y = y_size - 1
    last_x = x_size - 1
    for y in range(y_size):
        for x in range(x_size):
            old_value = result[y, x]
            if old_value < 0:
                new_value = 0
            elif old_value > 255:
                new_value = 255
            else:
                new_value = np.uint8(np.round(old_value / 255.0)) * 255
            result[y, x] = new_value
            # We might get a negative value for the error:
            error = np.int16(old_value) - new_value
            if x < last_x:
                result[y, x + 1] += error * 7 // 16
            if y < last_y and x > 0:
                result[y + 1, x - 1] += error * 3 // 16
            if y < last_y:
                result[y + 1, x] += error * 5 // 16
            if y < last_y and x < last_x:
                result[y + 1, x + 1] += error // 16

    return result.astype(np.uint8)

baseline_result = dither(image)

Here’s what it looks like:

The dithered image

And here’s how long it takes to run:

Code Time to run (milliseconds)
dither(image) 2.3

Some concepts you should know

The optimizations we’ll use below rely on concepts that are covered in much more detail in the book it’s based on. To help you understand this article on its own, however, here are some quick explanations:

  • Instruction-level parallelism (ILP): Your CPU can transparently run multiple operations in parallel, on a single CPU core, so long as they don’t depend on each other. In other words, ILP makes your code faster without any work on your part, at least when it’s possible.
  • Branch prediction: To help with ILP, your CPU will try to predict whether or not branches—if statements and the like—will be taken. If the prediction is wrong, your code will get a lot slower as the CPU needs to undo the speculative work it did.
  • SIMD (Single Instruction, Multiple Data): If the compiler emits the right CPU instructions, the CPU can do the same operation, for example adding a number, to multiple numbers at the same time with a single instruction.
  • Memory hierarchy: Reading and writing from memory is slow, so your CPU has caches (L1, L2, L3) to speed up access. Your CPU also has a small number of extra-fast “registers”, locations where it can keep small values like 64-bit numbers and run operations directly on the values.

All these concepts and how they can help you speed up your code will be covered in far more detail in the upcoming book.

Considering what to optimize

In general, we want to make the inner part of the loop as fast as possible. Looking at the error diffusion part of the part of the code, instruction-level parallelism ought to help speed up running the code, given that each calculation is independent:

if x < last_x:
    result[y, x + 1] += error * 7 // 16
if y < last_y and x > 0:
    result[y + 1, x - 1] += error * 3 // 16
if y < last_y:
    result[y + 1, x] += error * 5 // 16
if y < last_y and x < last_x:
    result[y + 1, x + 1] += error // 16

What about all those branches—will branch misprediction make them slow? A little thought suggests that these branches are very predictable because they depend only on the pixel location. Consider a 6×6 image: depending on the location of a pixel in the image, different combinations of branches will be taken.

1 2 2 2 2 3
1 2 2 2 2 3
1 2 2 2 2 3
1 2 2 2 2 3
1 2 2 2 2 3
4 5 5 5 5 6

For example, pixels in zones 1 and 4 won’t be able to diffuse the error to the previous column, because there is no previous column. Therefore the relevant branch (if y < last_y and x > 0) won’t be taken.

In larger images, virtually all the pixels will be in zone 2, with the exact same branches taken, so the CPU ought to be able to reliably predict those branches. Thus a reasonable assumption is that the diffusion part of the code is running at a decent speed even in its current state.

In contrast, the calculation of the error itself seems potentially slow:

if old_value < 0:
    new_value = 0
elif old_value > 255:
    new_value = 255
else:
    new_value = np.uint8(np.round(old_value / 255.0)) * 255

First, the branches depend on the values of the pixels, so they may be hard for the CPU to predict, and it’s not clear whether the compiler will generate branchless code. Second, there’s some relatively complex math going on: rounding a float seems like it would be slow.

Optimizing rounding

The naive rounding implementation addresses three possible cases for intermediate pixel values:

  1. Negative numbers should be rounded to 0 (black).
  2. Numbers larger than 255 should be rounded 255 (white).
  3. Numbers in between should be rounded to the closest of 0 to 255.

All of this can be simplified into a single simple check: measuring whether the pixel value is smaller or bigger than the middle point. In Python: new_value = 0 if old_value < 128 else 255. Since new_value gets a value set in either case, the hope is that the compiler will turn this into branchless code, so we don’t have to worry about the cost of branch misprediction.

@njit
def dither2(img):
    result = img.astype(np.int16)
    y_size = img.shape[0]
    x_size = img.shape[1]
    last_y = y_size - 1
    last_x = x_size - 1
    for y in range(y_size):
        for x in range(x_size):
            old_value = result[y, x]
            # Branchless, simple rounding:
            new_value = 0 if old_value < 128 else 255
            result[y, x] = new_value
            error = old_value - new_value
            if x < last_x:
                result[y, x + 1] += error * 7 // 16
            if y < last_y and x > 0:
                result[y + 1, x - 1] += error * 3 // 16
            if y < last_y:
                result[y + 1, x] += error * 5 // 16
            if y < last_y and x < last_x:
                result[y + 1, x + 1] += error // 16
    return result.astype(np.uint8)

assert np.array_equal(dither2(image), baseline_result)

Here’s how long it takes to run:

Code Time to run (microseconds)
dither2(image) 830.7

That’s a lot better! Remember, 1000 microseconds is 1 millisecond.

Optimizing memory usage

While the code is now a lot faster, there’s another problem to consider: it’s using quite a lot of memory. The input image uses N bytes of memory, one uint8 per pixel. dither() and dither2() both allocate 3N bytes: an int16 array costs 2 bytes per pixel, and the final uint8 result array is an additional byte per pixel.

We can improve this by noticing that intermediate error accumulation only happens on the current row and the next row. And once we’re done with a row it always fits in 8 bits and never changes again. So we really only need to keep 2 rows worth of memory as int16, and we can reuse the same memory as we traverse the image:

@njit
def dither3(img):
    result = np.empty(img.shape, dtype=np.uint8)
    # Temporary storage of current and next row's intermediate values:
    staging = img[0:2].astype(np.int16)
    y_size = img.shape[0]
    x_size = img.shape[1]
    last_x = x_size - 1
    for y in range(y_size):
        for x in range(x_size):
            old_value = staging[0, x]
            new_value = 0 if old_value < 128 else 255
            staging[0, x] = new_value
            error = old_value - new_value
            if x < last_x:
                staging[0, x + 1] += error * 7 // 16
            if x > 0:
                staging[1, x - 1] += error * 3 // 16
            staging[1, x] += error * 5 // 16
            if x < last_x:
                staging[1, x + 1] += error // 16

        # Copy current row of staging into result:
        result[y,:] = staging[0,:]
        # Prepare staging area for next iteration:
        staging[0,:] = staging[1,:]
        if y < y_size - 2:
            staging[1,:] = img[y + 2,:]
    return result

assert np.array_equal(dither3(image), baseline_result)

Here’s how long it takes to run:

Code Time to run (microseconds)
dither3(image) 909.6

That’s a little slower than dither2(), but still a lot faster than the original naive implementation. And now peak memory allocation is approximately N bytes, since the staging array doesn’t use much memory. Put another way, this version cuts memory usage by two-thirds.

Gaining back some performance

The new version is probably slower because it does a bunch of copying. Once processing the current row is done, the contents of the next row (staging[1]) has to be copied into the current row (staging[0]), in order to preserve the errors diffused to the next row. If we split our staging array into two, one for the current row and one for the next row, we can just swap those two arrays instead of copying the data.

@njit
def dither4(img):
    result = np.empty(img.shape, dtype=np.uint8)
    # Two arrays, one for staging the current row and one for the next:
    staging_current = img[0].astype(np.int16)
    staging_next = img[1].astype(np.int16)
    y_size = img.shape[0]
    x_size = img.shape[1]
    last_x = x_size - 1
    for y in range(y_size):
        for x in range(x_size):
            old_value = staging_current[x]
            new_value = 0 if old_value < 128 else 255
            staging_current[x] = new_value
            error = old_value - new_value
            if x < last_x:
                staging_current[x + 1] += error * 7 // 16
            if x > 0:
                staging_next[x - 1] += error * 3 // 16
            staging_next[x] += error * 5 // 16
            if x < last_x:
                staging_next[x + 1] += error // 16

        # Copy current row of staging into result:
        result[y,:] = staging_current[:]
        # Switch the next row to be the current one, and copy in the next row's
        # initial data from the original image:
        staging_current, staging_next = staging_next, staging_current
        if y < y_size - 2:
            staging_next[:] = img[y + 2,:]

    return result

assert np.array_equal(dither4(image), baseline_result)

Here’s how long it takes to run:

Code Time to run (microseconds)
dither4(image) 876.6

That’s slightly better.

Going even faster

We still have all those annoying if statements in the main loop, which are used to handle edge pixels. For example, if you’re in the first column, it’s not possible to diffuse the errors to the previous column.

We can get rid of those conditionals, though. The code currently has temporary staging arrays for accumulating errors, and there’s no reason they have to be the same size as the input or the result. We can just add an extra item at the start and end. As a result edge pixels can behave the same way as non-edge pixels, and we can get rid of the conditionals.

As an additional optimization, notice that copying staging_current into the result array isn’t actually necessary. Once a pixel is finalized, it won’t change, so we can just write directly to result and skip updating staging_current.

@njit
def dither5(img):
    result = np.empty(img.shape, dtype=np.uint8)
    y_size = img.shape[0]
    x_size = img.shape[1]
    # The staging arrays have an extra entry at the start and at the end, so
    # that we don't need conditionals to handle edge pixels.
    staging_current = np.zeros(x_size + 2, np.int16)
    staging_current[1:-1] = img[0]
    staging_next = np.zeros(x_size + 2, np.int16)

    for y in range(y_size):
        # Copy in the next row's data:
        if y < y_size - 1:
            staging_next[1:-1] = img[y + 1,:]

        for x in range(x_size):
            old_value = staging_current[x + 1]
            new_value = 0 if old_value < 128 else 255
            # This is the final result, so we can store it directly in the
            # result:
            result[y, x] = new_value
            error = old_value - new_value
            staging_current[x + 2] += error * 7 // 16
            staging_next[x] += error * 3 // 16
            staging_next[x + 1] += error * 5 // 16
            staging_next[x + 2] += error // 16

        # Switch the next row to be the current one:
        staging_current, staging_next = staging_next, staging_current

    return result

assert np.array_equal(dither5(image), baseline_result)

Here’s how long it takes to run:

Code Time to run (microseconds)
dither5(image) 816

Reducing memory reads and writes

We can do even better! We’re doing a lot of reading and writing from staging_current and staging_next, and this isn’t strictly necessary. If we can move some of the temporary integer values we’re accumulating to variables on the stack, they will likely end up being stored in CPU registers. Reading and writing from registers is faster than reading and writing memory, even when the CPU is using a fast cache to speed up the latter.

@njit
def dither6(img):
    result = np.empty(img.shape, dtype=np.uint8)
    y_size = img.shape[0]
    x_size = img.shape[1]
    staging_current = np.zeros(x_size + 2, np.int16)
    staging_current[1:-1] = img[0]
    staging_next = np.zeros(x_size + 2, np.int16)

    for y in range(y_size):
        right_pixel_error = 0
        downleft_prev_error = 0
        downleft_prevprev_error = 0
        for x in range(x_size):
            old_value = staging_current[x + 1] + right_pixel_error
            new_value = 0 if old_value < 128 else 255
            result[y, x] = new_value
            error = old_value - new_value
            right_pixel_error = error * 7 // 16
            # Now that we have all three sets of errors accumulated, store
            # them:
            staging_next[x] = (
                img[y + 1, x - 1] + downleft_prev_error + error * 3 // 16
            )
            # Accumulate errors for the next iteration:
            downleft_prev_error = downleft_prevprev_error + error * 5 // 16
            downleft_prevprev_error = error // 16

        # Update the final pixel in the next row; it only gets two diffused
        # errors, not three, so it doesn't get updated in the inner loop.
        staging_next[x_size] = img[y + 1, x_size - 1] + downleft_prev_error

        staging_current, staging_next = staging_next, staging_current

    return result

assert np.array_equal(dither6(image), baseline_result)

Comparing our updated code to the previous version:

  • Previously we read twice and wrote once to staging_current in every inner loop iteration. Now we only read once, and don’t write at all.
  • Previously we read and wrote to staging_next three times in every inner loop iteration, and overwrote it once per outer loop. Now we only write once in every inner loop iteration, and don’t read at all.

Here’s how long it takes to run:

Code Time to run (microseconds)
dither6(image) 602.7

This version of the code is slightly harder to understand, but hopefully it’s not too bad.

Relaxing our accuracy requirements

So far each optimized version made sure the results exactly matched the original version. This was helpful for catching bugs, but it’s not necessary. Dithering is a visual effect; most of the time, as long as it looks right, that’s all that matters.

If we’re willing to accept slightly different results, we can centralize the division by 16. Since rounding the fractions happens slightly differently, the results are slightly different, but that’s fine.

@njit
def dither7(img):
    result = np.empty(img.shape, dtype=np.uint8)
    y_size = img.shape[0]
    x_size = img.shape[1]
    staging_current = np.zeros(x_size + 2, np.int16)
    staging_current[1:-1] = img[0]
    staging_next = np.zeros(x_size + 2, np.int16)

    for y in range(y_size):
        right_pixel_error = 0
        downleft_prev_error = 0
        downleft_prevprev_error = 0
        for x in range(x_size):
            old_value = staging_current[x + 1] + right_pixel_error
            new_value = 0 if old_value < 128 else 255
            result[y, x] = new_value
            error = old_value - new_value
            right_pixel_error = error * 7 // 16
            staging_next[x] = (
                img[y + 1, x - 1] + (downleft_prev_error + error * 3) // 16
            )
            # Don't divide by 16 yet, only do that when we store final version
            # in staging_next:
            downleft_prev_error = downleft_prevprev_error + error * 5
            downleft_prevprev_error = error

        staging_next[x_size] = (
            img[y + 1, x_size - 1] + downleft_prev_error // 16
        )

        staging_current, staging_next = staging_next, staging_current

    return result

Here’s what it looks like:

The dithered image

And here’s a comparison to previous versions:

Code Time to run (microseconds)
dither(image) 2,339.5
dither2(image) 827.9
dither3(image) # Bit slower, but reduces memory usage by 2/3rds 909.7
dither4(image) 875.3
dither5(image) 814.3
dither6(image) 601.7
dither7(image) 553.9

Can you make this code even faster? If you give it a try and find some additional tricks, let me know. Note that some optimizations may be easier to identify and try if you go back and start from an earlier version of the code.

And if you’d like to understand the concepts these optimizations rely on—ILP, branch prediction, SIMD, and more—sign up to get notified when the book is released.