Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Async download

Introduction

During the last chapter, we set up the required pipelining infrastructure for the CPU and GPU steps of computation to execute in parallel with each other. This took us from a situation where the CPU and GPU kept waiting for each other…

Timeline of lockstep CPU/GPU execution

…to a situation where it is only the fastest device that awaits the slowest device:

Timeline of CPU/GPU execution with async results processing

However, the above diagrams are simplifications of a more complex hardware reality, where both the CPU and GPU work are composed of many independent components that can operate in parallel. By introducing more parallelism on either side, we should be able to improve our performance further. But how should we focus this effort?

On the CPU side, we mentioned multiple times that command buffer recording is relatively cheap. Which means we do not expect a massive performance benefit from offloading it to a different thread, and because of thread synchronization costs it could be a net loss. Beyond that, the other CPU tasks that we could consider parallelizing are all IO-related:

  • NVMe storage and ramdisks like /dev/shm are fast enough to benefit from the use of multiple I/O threads. But unfortunately the HDF5 library does not know how to correctly leverage multithreading for I/O, so we are stuck with our choice of file format here.
  • What we do control is the copy of GPU results from the main CPU thread to the I/O thread, which is currently performed in a sequential manner. But parallelizing memory-bound tasks like this is rather difficult, and we only expect significant benefits in I/O bound worloads that write to a ramdisk, which are arguably a bit of an edge case.

Taking all this into consideration, further CPU-side parallelization does not look very promising at this point. But what about GPU-side parallelization? That’s a deep topic, so in this chapter we will start with the first stage of GPU execution, which is the submission of commands from the CPU.

Right from this stage, a typical high-end modern GPU provides at least 2-3 flavors of independent hardware command submission interfaces, which get exposed as queue families in Vulkan:

  • “Main” queues support all operations including traditional live 3D rendering.
  • “Asynchronous compute” queues can execute compute commands and data transfer operations in parallel with the main queues when enough resources are available.
  • Dedicated “DMA” queues, if available, can only execute data transfer commands but have a higher chance to be able to operate in parallel with the other queues.

Knowing this, we would now like to leverage this extra hardware parallelism, if available, in order to go from our current sequential GPU execution model…

GPU timeline without DMA

…to a state that leverages the hardware concurrent data transfer capabilities if available:

GPU timeline with DMA

The purpose of this chapter will be to take us from here to there. From this optimization, we expect to improve the simulation’s performance by up to 2x in the best-case scenario where computation steps and GPU-to-CPU data transfers take roughly the same amount of time, and the application performance is limited by the speed at which the GPU does its work (as opposed to e.g. the speed at which data gets written to a storage device).

Transfer queue

To perform asynchronous data transfers, the first thing that we are going to need is some access to the GPU’s dedicated DMA (if available) or asynchronous compute queues.1 To get there, we will need to break an assumption that has been hardcoded in our codebase for a while, namely that a single Queue ought to be enough for anybody.

The first step in this journey will be for us to rename our queue family selection function to highlight its compute-centric nature:

fn compute_queue_family_index(device: &PhysicalDevice) -> u32 {
    device
        .queue_family_properties()
        .iter()
        .position(|family| family.queue_flags.contains(QueueFlags::COMPUTE))
        .expect("device does not support compute (or graphics)") as u32
}

We will then write a second version of this function that attempts to pick up the device’s dedicated DMA queue (if available) or its asynchronous compute queue (otherwise). This is hard to do in a hardware-agnostic way, but one fairly reliable trick is to pick the queue family that supports data transfers and as few other Vulkan operations as possible:

/// Pick up the queue family that has the highest chance of supporting
/// asynchronous data transfers, and is distinct from the main compute queue
fn transfer_queue_family_index(device: &PhysicalDevice, compute_idx: u32) -> Option<u32> {
    use QueueFlags as QF;
    device
        .queue_family_properties()
        .iter()
        .enumerate()
        .filter(|(idx, family)| {
            // Per the Vulkan specification, if a queue family supports graphics
            // or compute, then it is guaranteed to support data transfers and
            // does not need to advertise support for it. Why? Ask Khronos...
            *idx != compute_idx as usize
                && family
                    .queue_flags
                    .intersects(QF::TRANSFER | QF::COMPUTE | QF::GRAPHICS)
        })
        .min_by_key(|(idx, family)| {
            // Among the queue families that support data transfers, we priorize
            // the queue families that...
            //
            // - Explicitly advertise support for data transfers
            // - Advertise support for as few other operations as possible
            // - Come as early in the queue family list as possible
            let flags = family.queue_flags;
            let flag_priority = if flags.contains(QF::TRANSFER) { 0 } else { 1 };
            let specialization = flags.count();
            (flag_priority, specialization, *idx)
        })
        .map(|(idx, _family)| idx as u32)
}

We will then modify our device setup code to allocate one queue from each of these families…

/// Initialized `Device` with associated queues
struct DeviceAndQueues {
    device: Arc<Device>,
    compute_queue: Arc<Queue>,
    transfer_queue: Option<Arc<Queue>>,
}
//
impl DeviceAndQueues {
    /// Set up a device and associated queues
    fn new(device: Arc<PhysicalDevice>) -> Result<Self> {
        // Prepare to request queues
        let mut queue_create_infos = Vec::new();
        let queue_create_info = |queue_family_index| QueueCreateInfo {
            queue_family_index,
            ..Default::default()
        };

        // Create a compute queue, and a transfer queue if distinct
        let compute_family = compute_queue_family_index(&device);
        queue_create_infos.push(queue_create_info(compute_family));
        if let Some(transfer_family) = transfer_queue_family_index(&device, compute_family) {
            queue_create_infos.push(queue_create_info(transfer_family))
        }

        // Set up the device and queues
        let (device, mut queues) = Device::new(
            device,
            DeviceCreateInfo {
                queue_create_infos,
                ..Default::default()
            },
        )?;

        // Get the compute queue, and the transfer queue if available
        let compute_queue = queues
            .next()
            .expect("we asked for a compute queue, we should get one");
        let transfer_queue = queues.next();
        Ok(Self {
            device,
            compute_queue,
            transfer_queue,
        })
    }
}

…and finally, we are going to integrate this new nuance into our context struct.

/// Basic Vulkan setup that all our example programs will share
pub struct Context {
    pub device: Arc<Device>,
    pub compute_queue: Arc<Queue>,
    pub transfer_queue: Option<Arc<Queue>>,
    pipeline_cache: PersistentPipelineCache,
    pub mem_allocator: Arc<MemoryAllocator>,
    pub desc_allocator: Arc<DescriptorSetAllocator>,
    pub comm_allocator: Arc<CommandBufferAllocator>,
    _messenger: Option<DebugUtilsMessenger>,
}
//
impl Context {
    /// Set up a `Context`
    pub fn new(
        options: &ContextOptions,
        quiet: bool,
        progress: Option<ProgressBar>,
    ) -> Result<Self> {
        let library = VulkanLibrary::new()?;
        let mut logging_instance = LoggingInstance::new(library, &options.instance, progress)?;
        let physical_device =
            select_physical_device(&logging_instance.instance, &options.device, quiet)?;
        let DeviceAndQueues {
            device,
            compute_queue,
            transfer_queue,
        } = DeviceAndQueues::new(physical_device)?;
        let pipeline_cache = PersistentPipelineCache::new(device.clone())?;
        let (mem_allocator, desc_allocator, comm_allocator) = setup_allocators(device.clone());
        let _messenger = logging_instance.messenger.take();
        Ok(Self {
            device,
            compute_queue,
            transfer_queue,
            pipeline_cache,
            mem_allocator,
            desc_allocator,
            comm_allocator,
            _messenger,
        })
    }

    // [ ... same as before ... ]
}

After this is done, we can let compiler errors guide us into replacing every occurence of context.queue in our program with context.compute_queue. And with that, we will get a program that still only uses a single Vulkan compute queue… but now possibly allocates another data transfer queue that we will be able to leverage next.

Triple buffering

Theory

Having reached the point where we do have access to a data transfer queue, we could proceed to adjust our command buffer creation and queue submission logic to perform asynchronous data transfers. But then we would once again get a rustc or vulkano error, get puzzled by that error, think it through, and after a while thank these tools for saving us from ourselves again.

Indeed, asynchronous data transfers are yet another application on the general concept of pipelining optimizations. And like all pipelining optimizations, they require some kind of resource duplication in order to work. Without such duplication, we will once again face a data race:

Timeline of a DMA-driven data race

In case the above diagram does not make it apparent, the problem with our current memory management strategy is that if we try to download the UV1 version of the V species’ concentration while running more simulation steps, and our download takes more time than one simulation step, then subsequent simulation steps will eventually end up overwriting the data that we are in the process of transfering to the CPU, resulting in yet another data race.

We can avoid this data race by allocating a third set of (U, V) concentrations buffers on the GPU side. Which will allow us to have not just one, but three pairs of (U, V) datasets, where each pair does not contain one of the three (U, V) datasets…

  • The (UV2, UV3) pair does not contain UV1
  • The (UV1, UV3) pair does not contain UV2
  • The (UV1, UV2) pair does not contain UV3

…and thus, by leveraging each of these pairs at the right time during our simulation steps, we will be able to keep computing new simulation steps on the GPU without overwriting the version of the (U, V) concentration data that is in the process of being transferred to the CPU side:

Timeline of race-free DMA through triple buffering

But needless to say, getting there will require us to make “a few” changes to the logic that we use to manage our GPU-side concentration data storage. Which will be the topic of the next section.

Implementation

Previously, our Concentrations type was effectively a double buffer of InOut, where each InOut contained a Vulkan descriptor set (for compute pipeline execution purposes) and a V input buffer (for output downloading purposes).

Now that we are doing triple buffering, however, we would need to have six such InOut structs, with redundant V input buffers, and that starts to feel wasteful. So instead of doing that, we will extract the descriptor set setup logic out of InOut

/// Set up a descriptor set that uses a particular `(U, V)` dataset as
/// inputs and another `(U, V)` dataset as outputs
fn setup_descriptor_set(
    context: &Context,
    layout: &PipelineLayout,
    in_u: Subbuffer<[Float]>,
    in_v: Subbuffer<[Float]>,
    out_u: Subbuffer<[Float]>,
    out_v: Subbuffer<[Float]>,
) -> Result<Arc<DescriptorSet>> {
    // Determine how the descriptor set will bind to the compute pipeline
    let set_layout = layout.set_layouts()[INOUT_SET as usize].clone();

    // Configure what resources will attach to the various bindings
    // that the descriptor set is composed of
    let descriptor_writes = [
        WriteDescriptorSet::buffer_array(IN, 0, [in_u, in_v]),
        WriteDescriptorSet::buffer_array(OUT, 0, [out_u, out_v]),
    ];

    // Set up the descriptor set according to the above configuration
    let descriptor_set = DescriptorSet::new(
        context.desc_allocator.clone(),
        set_layout,
        descriptor_writes,
        [],
    )?;
    Ok(descriptor_set)
}

…and trash the rest of InOut, replacing it with a new DoubleUV struct that is just a double buffer of descriptor sets that does not even handle buffer allocation duties anymore:

/// Double-buffered chemical species concentration storage
///
/// This manages Vulkan descriptor sets associated with a pair of `(U, V)`
/// chemical concentrations datasets, such that...
///
/// - At any point in time, one `(U, V)` dataset serves as inputs to the active
///   GPU compute pipeline, and another `(U, V)` dataset serves as outputs.
/// - A normal simulation update takes place by calling the
///   [`update()`](Self::update) method, which invokes a user-specified callback
///   with the descriptor set associated with the current input/output
///   configuration, then flips the role of the buffers (inputs become outputs
///   and vice versa).
/// - The [`current_descriptor_set()`](Self::current_descriptor_set) method can be used to query the
///   current descriptor set _without_ flipping the input/output roles. This is
///   useful for the triple buffering logic discussed below.
struct DoubleUV {
    /// If we denote `(U1, V1)` the first `(U, V)` concentration dataset and
    /// `(U2, V2)` the second concentration dataset...
    ///
    /// - The first "forward" descriptor set uses `(U1, V1)` as its inputs and
    ///   `(U2, V2)` as its outputs.
    /// - The second "reverse" descriptor set uses `(U2, V1)` as its inputs and
    ///   `(U2, V2)` as its outputs.
    descriptor_sets: [Arc<DescriptorSet>; 2],

    /// Truth that the "reverse" descriptor set is being used
    reversed: bool,
}
//
impl DoubleUV {
    /// Set up a `DoubleUV`
    fn new(
        context: &Context,
        layout: &PipelineLayout,
        u1: Subbuffer<[Float]>,
        v1: Subbuffer<[Float]>,
        u2: Subbuffer<[Float]>,
        v2: Subbuffer<[Float]>,
    ) -> Result<Self> {
        let forward = setup_descriptor_set(
            context,
            layout,
            u1.clone(),
            v1.clone(),
            u2.clone(),
            v2.clone(),
        )?;
        let reverse = setup_descriptor_set(context, layout, u2, v2, u1, v1)?;
        Ok(Self {
            descriptor_sets: [forward, reverse],
            reversed: false,
        })
    }

    /// Currently selected descriptor set
    fn current_descriptor_set(&self) -> Arc<DescriptorSet> {
        self.descriptor_sets[self.reversed as usize].clone()
    }

    /// Run a simulation step and flip the input/output roles
    fn update(&mut self, step: impl FnOnce(Arc<DescriptorSet>) -> Result<()>) -> Result<()> {
        step(self.current_descriptor_set())?;
        self.reversed = !self.reversed;
        Ok(())
    }
}

All the complexity of triple buffering will be handled by the top-level Concentrations struct, which is the only one that has all the information needed to take correct triple-buffering decisions. As the associated logic is sophisticated, we will help maintenance with an fair amount of doc comments:

/// Triple-buffered chemical species concentration storage
///
/// This manages three `(U, V)` chemical concentration datasets in such a way
/// that the following properties are guaranteed:
///
/// - At any point in time, a normal simulation update can be performed using
///   the [`update()`] method. This method will...
///   1. Invoke a user-specified callback with a descriptor set that goes from
///      one of the inner `(U, V)` datasets (let's call it `(Ui, Vi)`) to
///      another dataset (let's call it `(Uj, Vj)`).
///   2. Update the `Concentrations` state in such a way that the next time
///      [`update()`] is called, the descriptor set that will be provided will
///      use the former output dataset (called `(Uj, Vj)` above) as its input,
///      and another dataset as its output (can either be the same `(Ui, Vi)`
///      dataset used as input above, or the third `(Uk, Vk)` dataset).
/// - At any point in time, the [`lock_current_v()`](Self::lock_current_v)
///   method can be called as a preparation for downloading the current V
///   species concentration. This method will...
///   1. Return the [`Subbuffer`] associated with the current V species
///      concentration, i.e. the buffer that the next [`update()`] call will use
///      as its V concentration input.
///   2. Update the `Concentrations` state in such a way that subsequent calls
///      to [`update()`] will not use this buffer as their V species
///      concentration output. The other two `(U, V)` datasets will be used in a
///      double-buffered fashion instead.
///
/// The underlying logic assumes that the user will wait for the current data
/// download to complete before calling `lock_current_v()` again.
///
/// [`update()`]: Self::update
pub struct Concentrations {
    /// `Vi` buffer from each of the three `(Ui, Vi)` datasets that we manage.
    v_buffers: [Subbuffer<[Float]>; 3],

    /// Double buffers that leverage a pair of `(U, V)` datasets, such that the
    /// double buffer at index `i` within this array **does not** use the `(Ui,
    /// Vi)` dataset per the `v_buffers` indexing convention.
    ///
    /// For example, the double buffer at index 1 does not use the `(U1, V1)`
    /// dataset, it alternates between the `(U0, V0)` and `(U2, V2)` datasets.
    double_buffers: [DoubleUV; 3],

    /// Currently selected entry of `double_buffers`, used for `update()` calls
    /// after any `transitional_set` has been taken care of.
    double_buffer_idx: usize,

    /// Descriptor set used to perform the transition between two entries of the
    /// `double_buffers` array.
    ///
    /// Most of the time, simulation updates are fully managed by one
    /// [`DoubleUV`], i.e. they keep going from one `(Ui, Vi)` dataset to
    /// another `(Uj, Vj)` dataset and back, while the third `(Uk, Vk)` dataset
    /// is unused. However, when [`lock_current_v()`] gets called...
    ///
    /// - If we denote `(Ui, Vi)` the dataset currently used as input, it must
    ///   not be modified as long as the download scheduled by
    ///   [`lock_current_v()`] is ongoing. Therefore, the next simulation
    ///   updates must be performed using another double buffer composed of the
    ///   other two `(Uj, Vj)` and `(Uk, Vk)` datasets.
    /// - Before this can happen, we need to take one simulation step that uses
    ///   `(Ui, Vi)` as input and one of `(Uj, Vj)` or `(Uk, Vk)` as output, so
    ///   that one of these can become the next simulation input.
    ///
    /// `transitional_set` is the descriptor set that is used to perform the
    /// simulation step described above, before we can switch back to our
    /// standard double buffering logic.
    ///
    /// [`lock_current_v()`]: Self::lock_current_v
    transitional_set: Option<Arc<DescriptorSet>>,
}

The implementation of this new Concentrations struct starts with the buffer allocation stage, which is largely unchanged compared to what InOut::allocate_buffers() used to do. We simply allocate 6 buffers instead of 4, because now we need three U storage buffers and three V storage buffers:

impl Concentrations {
    /// Allocate the set of buffers that we are going to use
    fn allocate_buffers(
        options: &RunnerOptions,
        context: &Context,
    ) -> Result<[Subbuffer<[Float]>; 6]> {
        use BufferUsage as BU;
        let padded_rows = padded(options.num_rows);
        let padded_cols = padded(options.num_cols);
        let new_buffer = || {
            Buffer::new_slice(
                context.mem_allocator.clone(),
                BufferCreateInfo {
                    usage: BU::STORAGE_BUFFER | BU::TRANSFER_DST | BU::TRANSFER_SRC,
                    ..Default::default()
                },
                AllocationCreateInfo::default(),
                (padded_rows * padded_cols) as DeviceSize,
            )
        };
        Ok([
            new_buffer()?,
            new_buffer()?,
            new_buffer()?,
            new_buffer()?,
            new_buffer()?,
            new_buffer()?,
        ])
    }

    // [ ... ]
}

The new version of the create_and_schedule_init() constructor will not be that different from its predecessor at a conceptual level. But it will have a few more buffers to initialize, and will also need to set the stage for triple buffering shenanigans to come.

impl Concentrations {
    // [ ... ]

    /// Set up GPU data storage and schedule GPU buffer initialization
    ///
    /// GPU buffers will only be initialized after the command buffer associated
    /// with `cmdbuild` has been built and submitted for execution. Any work
    /// that depends on their initial value must be scheduled afterwards.
    pub fn create_and_schedule_init(
        options: &RunnerOptions,
        context: &Context,
        pipelines: &Pipelines,
        cmdbuild: &mut CommandBufferBuilder,
    ) -> Result<Self> {
        // Allocate all GPU storage buffers used by the simulation
        let [u0, v0, u1, v1, u2, v2] = Self::allocate_buffers(options, context)?;

        // Keep around the three V concentration buffers from these datasets
        let v_buffers = [v0.clone(), v1.clone(), v2.clone()];

        // Set up three (U, V) double buffers such that the double buffer at
        // index i uses the 4 buffers that do not include (Ui, Vi), and the
        // "forward" direction of each double buffer goes from the dataset of
        // lower index to the dataset of higher index.
        let layout = &pipelines.layout;
        let double_buffers = [
            DoubleUV::new(
                context,
                layout,
                u1.clone(),
                v1.clone(),
                u2.clone(),
                v2.clone(),
            )?,
            DoubleUV::new(
                context,
                layout,
                u0.clone(),
                v0.clone(),
                u2.clone(),
                v2.clone(),
            )?,
            DoubleUV::new(context, layout, u0.clone(), v0.clone(), u1, v1)?,
        ];

        // Schedule the initialization of the first simulation input
        //
        // - We will initially work with the double buffer at index 0, which is
        //   composed of datasets (U1, V1) and (U2, V2).
        // - We will initially access it in order, which means that the (U1, V1)
        //   dataset will be the first simulation input.
        // - Due to the above, to initialize this first simulation input, we
        //   need to bind a descriptor set that uses (U1, V1) as its output.
        // - This is true of the reverse descriptor set within the first double
        //   buffer, in which (U2, V2) is the input and (U1, V1) is the output.
        let double_buffer_idx = 0;
        cmdbuild.bind_pipeline_compute(pipelines.init.clone())?;
        cmdbuild.bind_descriptor_sets(
            PipelineBindPoint::Compute,
            pipelines.layout.clone(),
            INOUT_SET,
            double_buffers[double_buffer_idx].descriptor_sets[1].clone(),
        )?;
        let num_workgroups = |domain_size: usize, workgroup_size: NonZeroU32| {
            padded(domain_size).div_ceil(workgroup_size.get() as usize) as u32
        };
        let padded_workgroups = [
            num_workgroups(options.num_cols, options.pipeline.workgroup_cols),
            num_workgroups(options.num_rows, options.pipeline.workgroup_rows),
            1,
        ];
        // SAFETY: GPU shader has been checked for absence of undefined behavior
        //         given a correct execution configuration, and this is one
        unsafe {
            cmdbuild.dispatch(padded_workgroups)?;
        }

        // Any dataset other than the (U1, V1) initial input must have its edges
        // initialized to zeros.
        //
        // Only the edges need to be initialized. The values at the center of
        // the dataset do not matter, as these buffers will serve as simulation
        // outputs at least once (which will initialize their central values)
        // before they serve as a simulation input.
        //
        // Here we initialize the entire buffer to zero, as the Vulkan
        // implementation is likely to special-case this buffer-zeroing
        // operation with a high-performance implementation.
        cmdbuild.fill_buffer(u0.reinterpret(), 0)?;
        cmdbuild.fill_buffer(v0.reinterpret(), 0)?;
        cmdbuild.fill_buffer(u2.reinterpret(), 0)?;
        cmdbuild.fill_buffer(v2.reinterpret(), 0)?;

        // Once the command buffer is executed, everything will be ready
        Ok(Self {
            v_buffers,
            double_buffers,
            double_buffer_idx,
            transitional_set: None,
        })
    }

    // [ ... ]
}

For reasons that will become clear later on, we will drop the idea of having a current_inout() function. Instead, we will only have…

  • An update() method that is used when performing simulation steps, as before.
  • A new lock_current_v() method that is used when scheduling a V concentration download.

We will describe the latter first, as its internal logic will motivate the existence of the new transitional_set data member of the Concentrations struct, which the update() method will later need to use in an appropriate manner.


As mentioned above, lock_current_v() should be called after scheduling a set of simulation steps, in order to lock the current (U, V) dataset for the duration of the final GPU-to-CPU download. This will prevent the associated V buffer from being used as a simulation output for the duration of the download, and provide the associated Subbuffer to the caller so that it can initiate the download:

impl Concentrations {
    // [ ... ]

    /// Lock the current V species concentrations for a GPU-to-CPU download
    fn lock_current_v(&mut self) -> Subbuffer<[Float]> {
        // [ ... ]
    }

    // [ ... ]
}

The implementation of lock_current_v() starts with a special case which we will not discuss yet, which handles the scenario where two GPU-to-CPU downloads are initiated in quick succession without any simulation step inbetween. After this, we get code for the general case where some simulation steps have occured after the previous GPU-to-CPU download.

First of all, we use the index of the current double buffer and its reversed flag in order to tell what are the index of the current input and output buffer, using the indexing convention introduced by the Concentrations::create_and_schedule_init() constructor:

// [ ... handle special case of two consecutive downloads ... ]

let initial_double_buffer_idx = self.double_buffer_idx;
let (mut initial_input_idx, mut initial_output_idx) = match initial_double_buffer_idx {
    0 => (1, 2),
    1 => (0, 2),
    2 => (0, 1),
    _ => unreachable!("there are only three double buffers"),
};
let initial_double_buffer = &self.double_buffers[initial_double_buffer_idx];
if initial_double_buffer.reversed {
    std::mem::swap(&mut initial_input_idx, &mut initial_output_idx);
}

From this, we trivially deduce which buffer within the Concentrations::v_buffers array is our current V input buffer. We keep a copy of it, that will be the return value of lock_current_v():

let input_v = self.v_buffers[initial_input_idx].clone();

The remainder of lock_current_v()’s implementation will then be concerned with ensuring that this V buffer is not used as a simulation output again as long as a GPU-to-CPU download is in progress. The indexing convention of the Concentrations::double_buffers array has been chosen such that this is just a matter of switching to the double-buffer at index initial_input_idx

let next_double_buffer_idx = initial_input_idx;

…but before we start using this double buffer, we must first perform a simulation step that takes the simulation input from our current input buffer (which is not part of the new double buffer) to another buffer (which will be part of the new double buffer since there are only three (U, V) pairs).

For example, assuming we are initially using the first double buffer (whose members are datasets (U1, V1) and (U2, V2)) and our current input buffer is (U1, V1), we will want to move to the second double buffer (whose members ars datasets (U0, V0) and (U2, V2)) by performing one simulation step that goes from (U1, V1) to (U0, V0) or (U2, V2).

It should be apparent that the active descriptor set within the current double buffer (which, in the above example, performs a simulation step from dataset (U1, V1) to dataset (U2, V2)) will always go to one of the datasets within the new double buffer, so we can use it to perform the transition…

self.transitional_set = Some(initial_double_buffer.current_descriptor_set().clone());

…but we must then configure the new double buffer’s reversed flag so that the first simulation step that uses it goes in the right direction. In the above example, after the transitional step that uses (U1, V1) as input and (U2, V2) as output, we want to the next simulation step to go from (U2, V2) to (U0, V0), and not from (U0, V0) to (U2, V2).

This can be done by figuring out what will be the index of the input and output datasets in the first step that uses the new double buffer…

let next_input_idx = initial_output_idx;
let next_output_idx = match (next_double_buffer_idx, next_input_idx) {
    (0, 1) => 2,
    (0, 2) => 1,
    (1, 0) => 2,
    (1, 2) => 0,
    (2, 0) => 1,
    (2, 1) => 0,
    _ => {
        unreachable!(
            "there are only three double buffers, each of which has \
            one input and one output dataset whose indices differ from \
            the double buffer's index"
        )
    }
};

…and setting up the reversed flag of the new double buffer accordingly:

self.double_buffer_idx = next_double_buffer_idx;
self.double_buffers[next_double_buffer_idx].reversed = next_output_idx < next_input_idx;

Once the Concentrations is set up in this way, we can return the input_v that we previously recorded to the caller of lock_current_v(). And this is the end of the implementation of the lock_current_v() function.


The update() method’s implementation is then written in such a way that any transitional_step is taken first before using the current double buffer, to make sure that any double buffer transition scheduled by lock_current_v() is performed before the new double buffer is used…

impl Concentrations {
    // [ ... ]

    /// Run a simulation step
    ///
    /// The `step` callback will be provided with the descriptor set that should
    /// be used for the next simulation step. If you need to carry out multiple
    /// simulation steps, you should call `update()` once per simulation step.
    pub fn update(&mut self, step: impl FnOnce(Arc<DescriptorSet>) -> Result<()>) -> Result<()> {
        if let Some(transitional_set) = self.transitional_set.take() {
            // If we need to transition from a freshly locked (U, V) dataset to
            // another double buffer, do so...
            step(transitional_set)
        } else {
            // ...otherwise, keep using the current double buffer
            self.double_buffers[self.double_buffer_idx].update(step)
        }
    }
}

…and because this uses Option::take(), there is a guarantee that the transitional simulation step will only be carried out once. After that, Concentrations will resume using a simple double-buffering logic as it did before the current V buffer was locked.


There is just one edge case to take care of. If two GPU-to-CPU downloads are started in quick succession, with not simulation step inbetween, then all the buffer-locking work has already been carried out by the lock_current_v() call that initiated the first GPU-to-CPU download.

As a result, we do not need to do anything to lock the current input buffer (which has been done already), and can simply figure out the input_v buffer associated with the current V concentration input and return it right away. That’s what the edge case handling at the beginning of lock_current_v() does, completing the implementation of this function:

impl Concentrations {
    // [ ... ]

    /// Lock the current V species concentrations for a GPU-to-CPU download
    fn lock_current_v(&mut self) -> Subbuffer<[Float]> {
        // If no update has been carried out since the last lock_current_v()
        // call, then we do not need to change anything to the current
        // Concentrations state, and can simply return the current V input
        // buffer again. Said input buffer is easily identified as the one that
        // future simulation updates will be avoiding.
        if self.transitional_set.is_some() {
            return self.v_buffers[self.double_buffer_idx].clone();
        }

        // Otherwise, determine the index of the input and output buffer of the
        // currently selected descriptor set
        let initial_double_buffer_idx = self.double_buffer_idx;
        let (mut initial_input_idx, mut initial_output_idx) = match initial_double_buffer_idx {
            0 => (1, 2),
            1 => (0, 2),
            2 => (0, 1),
            _ => unreachable!("there are only three double buffers"),
        };
        let initial_double_buffer = &self.double_buffers[initial_double_buffer_idx];
        if initial_double_buffer.reversed {
            std::mem::swap(&mut initial_input_idx, &mut initial_output_idx);
        }

        // The initial simulation input is going to be downloaded...
        let input_v = self.v_buffers[initial_input_idx].clone();

        // ...and therefore we must refrain from using it as a simulation output
        // in the future, which we do by switching to the double buffer that
        // does not use this dataset (by definition of self.double_buffers)
        let next_double_buffer_idx = initial_input_idx;

        // To perform this transition correctly, we must first perform one
        // simulation step that uses the current dataset as input and another
        // dataset as output. The current descriptor set within the double
        // buffer that we used before does take us from our initial (Ui, Vi)
        // input dataset to another (Uj, Vj) output dataset, and is therefore
        // appropriate for this purpose.
        self.transitional_set = Some(initial_double_buffer.current_descriptor_set().clone());

        // After this step, we will land into a (Uj, Vj) dataset that is part of
        // our new double buffer, but we do not know if it is the first or the
        // second dataset within this double buffer. And we need to know that in
        // order to set the "reversed" flag of the double buffer correctly.
        //
        // We already know what is the index of the dataset that serves as a
        // transitional output, and from that and the index of our new double
        // buffer, we can tell what is the index of the third dataset, i.e. the
        // other dataset within our new double buffer...
        let next_input_idx = initial_output_idx;
        let next_output_idx = match (next_double_buffer_idx, next_input_idx) {
            (0, 1) => 2,
            (0, 2) => 1,
            (1, 0) => 2,
            (1, 2) => 0,
            (2, 0) => 1,
            (2, 1) => 0,
            _ => {
                unreachable!(
                    "there are only three double buffers, each of which has \
                    one input and one output dataset whose indices differ from \
                    the double buffer's index"
                )
            }
        };

        // ...and given that, we can easily tell if the first step within our
        // next double buffer, after the transitional step, will go in the
        // forward or reverse direction.
        self.double_buffer_idx = next_double_buffer_idx;
        self.double_buffers[next_double_buffer_idx].reversed = next_output_idx < next_input_idx;
        input_v
    }

    // [ ... ]
}

VBuffers changes

To be able to use the new version of Concentrations, a couple of changes to the VBuffers::schedule_download_and_flip() method are required:

  • This method must now receive &mut Concentrations, not just &Concentrations, as it now needs to modify the Concentrations triple buffering configuration using lock_current_v() instead of simply querying its current input buffer for the V concentration.
  • …and the output of lock_current_v() tells it what the current V input buffer is, which is what it needs to initiate the GPU-to-CPU download.

Overall, the new implementation looks like this:

pub fn schedule_download_and_flip(
    &mut self,
    source: &mut Concentrations,
    cmdbuild: &mut CommandBufferBuilder,
) -> Result<()> {
    cmdbuild.copy_buffer(CopyBufferInfo::buffers(
        source.lock_current_v(),
        self.current_buffer().clone(),
    ))?;
    self.current_is_1 = !self.current_is_1;
    Ok(())
}

Simulation runner changes

Now that our data management infrastructure is ready for concurrent computations and GPU-to-CPU downloads, the last part of the work is to update our top-level simulation scheduling logic so that it actually uses a dedicated data transfer queue (if available) to run simulation steps and GPU-to-CPU data transfers concurrently.

To do this, we must first generalize our command buffer builder constructor so that it can work with any queue, not just the compute queue.

fn command_buffer_builder(context: &Context, queue: &Queue) -> Result<CommandBufferBuilder> {
    let cmdbuild = CommandBufferBuilder::primary(
        context.comm_allocator.clone(),
        queue.queue_family_index(),
        CommandBufferUsage::OneTimeSubmit,
    )?;
    Ok(cmdbuild)
}

Then we modify the definition of SimulationRunner to clarify that its internal command buffer builder is destined to host compute queue operations only…

/// State of the simulation
struct SimulationRunner<'run_simulation, ProcessV> {
    // [ ... members other than "cmdbuild" are unchanged ... ]

    /// Next command buffer to be executed on the compute queue
    compute_cmdbuild: CommandBufferBuilder,
}

…which will lead to some mechanical renamings inside of the SimulationRunner constructor:

impl<'run_simulation, ProcessV> SimulationRunner<'run_simulation, ProcessV>
where
    ProcessV: FnMut(ArrayView2<Float>) -> Result<()>,
{
    /// Set up the simulation
    fn new(
        options: &'run_simulation RunnerOptions,
        context: &'run_simulation Context,
        process_v: Option<ProcessV>,
    ) -> Result<Self> {
        // Set up the compute pipelines
        let pipelines = Pipelines::new(options, context)?;

        // Set up the initial command buffer builder
        let mut compute_cmdbuild = command_buffer_builder(context, &context.compute_queue)?;

        // Set up chemical concentrations storage and schedule its initialization
        let concentrations = Concentrations::create_and_schedule_init(
            options,
            context,
            &pipelines,
            &mut compute_cmdbuild,
        )?;

        // Set up the logic for post-processing V concentration, if enabled
        let output_handler = if let Some(process_v) = process_v {
            Some(OutputHandler {
                v_buffers: VBuffers::new(options, context)?,
                process_v,
            })
        } else {
            None
        };

        // We're now ready to perform simulation steps
        Ok(Self {
            options,
            context,
            pipelines,
            concentrations,
            output_handler,
            compute_cmdbuild,
        })
    }

    // [ ... ]
}

We then extract some of the code of the schedule_next_output() method into a new submit_compute() utility method which is in charge of scheduling execution of this command buffer builder on the compute queue…

impl<'run_simulation, ProcessV> SimulationRunner<'run_simulation, ProcessV>
where
    ProcessV: FnMut(ArrayView2<Float>) -> Result<()>,
{
    // [ ... ]

    /// Submit the internal compute command buffer to the compute queue
    fn submit_compute(&mut self) -> Result<CommandBufferExecFuture<impl GpuFuture + 'static>> {
        // Extract the old command buffer builder, replacing it with a blank one
        let old_cmdbuild = std::mem::replace(
            &mut self.compute_cmdbuild,
            command_buffer_builder(self.context, &self.context.compute_queue)?,
        );

        // Submit the resulting compute commands to the compute queue
        let future = old_cmdbuild
            .build()?
            .execute(self.context.compute_queue.clone())?;
        Ok(future)
    }

    // [ ... ]
}

…and we use that to implement a new version of schedule_next_output() that handles all possible configurations with minimal code duplication:

  • User requested GPU-to-CPU downloads and…
    • …a dedicated transfer queue is available.
    • …data transfers should run on the same compute queue as other commands.
  • No GPU-to-CPU downloads were requested, only simulation steps should be scheduled.

In terms of code, it looks like this:

impl<'run_simulation, ProcessV> SimulationRunner<'run_simulation, ProcessV>
where
    ProcessV: FnMut(ArrayView2<Float>) -> Result<()>,
{
    // [ ... ]

    /// Submit a GPU job that will produce the next simulation output
    fn schedule_next_output(&mut self) -> Result<FenceSignalFuture<impl GpuFuture + 'static>> {
        // Schedule a number of simulation steps
        schedule_simulation(
            self.options,
            &self.pipelines,
            &mut self.concentrations,
            &mut self.compute_cmdbuild,
        )?;

        // If the user requested that the output be downloaded...
        let future = if let Some(handler) = &mut self.output_handler {
            // ...then we must add the associated data transfer command to a
            // command buffer, that will later be submitted to a queue
            let mut schedule_transfer = |cmdbuild: &mut CommandBufferBuilder| {
                handler
                    .v_buffers
                    .schedule_download_and_flip(&mut self.concentrations, cmdbuild)
            };

            // If we have access to a dedicated DMA queue...
            if let Some(transfer_queue) = &self.context.transfer_queue {
                // ...then build a dedicated data transfer command buffer.
                let mut transfer_cmdbuild = command_buffer_builder(self.context, transfer_queue)?;
                schedule_transfer(&mut transfer_cmdbuild)?;
                let transfer_cmdbuf = transfer_cmdbuild.build()?;

                // Schedule the compute commands to execute on the compute
                // queue, then signal a semaphore, which will finally start the
                // execution of the data transfer on the DMA queue
                self.submit_compute()?
                    .then_signal_semaphore()
                    .then_execute(transfer_queue.clone(), transfer_cmdbuf)?
                    .boxed()
            } else {
                // If there is no dedicated DMA queue, make the data transfer
                // execute on the compute queue after the simulation steps.
                schedule_transfer(&mut self.compute_cmdbuild)?;
                self.submit_compute()?.boxed()
            }
        } else {
            // If there is no data transfer, then submit the compute commands
            // to the compute queue without any extra.
            self.submit_compute()?.boxed()
        };

        // Ask for a fence to be signaled once everything is done
        Ok(future.then_signal_fence_and_flush()?)
    }

    // [ ... ]
}

…and that’s it. No changes to SimulationRunner::process_output() and the top-level run_simulation() entry points are necessary.

One thing worth pointing out above is the use of the boxed() adapter method from the GpuFuture trait of vulkano. This method will turn our concrete GpuFuture implementations (which are arbitrarily complex type-level expressions) into a single dynamically dispatched and heap-allocated Box<dyn GpuFuture> type, so that all branches of our complex conditional logic return an object of a same type, which is required in a statically typed programming language like Rust.

If the compiler does not manage to optimize this out, it will result in some slight runtime overhead along the lines of those experienced when using of class-based polymorphism in object-oriented languages like C++ and Java. But by the usual argument that these overheads should be rather small compared to those of calling a complex GPU API like Vulkan, we will ignore these potential overheads for now, until profiling of our CPU usage eventually tells us to do otherwise.

Exercise

Implement the above changes in your Gray-Scott simulation implementation, and benchmark the effect on performance.

You should expect a mixed bag of performance improvements and regressions, depending on which hardware you are running on (availability/performance of dedicated DMA queues) and how much your simulation configuration is bottlenecked by GPU-to-CPU data transfers.


  1. It should be noted that by allowing tasks on a command queue to overlap, the Vulkan specification technically allows implementations to perform this optimization automatically. But at the time of writing, in the author’s experience, popular Vulkan implementations do not reliably perform this optimization. So we still need to give the hardware some manual pipelining help in order to get portable performance across all Vulkan-supported systems.