Click here to Skip to main content
14,771,778 members
Articles » General Programming » Parallel Programming » General
Project
Posted 25 Nov 2020

Tagged as

Stats

1.2K views
16 downloads
2 bookmarked

Queue-like Data Structure for Hierarchy-based Scheduling

Rate me:
Please Sign up or sign in to vote.
5.00/5 (1 vote)
25 Nov 2020CPOL
Data structure that allows items to be scheduled for processing based on the tags that define item hierarchy
The goal of forque data structure is to allow items to be scheduled for processing based on hierarchical tags attached to them. Forque ensures that at most one item is processed at a given time for all tags related to the item’s own tag, while respecting relative order among related items in which they have been reserved in the queue.

Introduction

The purpose of the forque structure is to schedule processing in such way that items processed concurrently do not affect each other. For instance, when we receive stream of updates for a specific data set, where entities in that data set have parent-child relationship, it is possible that we do not want to process more than one update for the same item, or to have updates done on parent and one or more of its children simultaneously.

To achieve this, user augments each enqueued item with a tag which defines three-like hierarchy. Based on these tags, forque ensures that only one item at a time can be processed for the specific sub-tree in the hierarchy.

For example, tags 1/2 and 1/2/3 have parent-child relationship, while tags 1/2/1 and 1/2/3 are not related.

forque structure

Structure of tag can be dynamic or static. Static tags have depth and types of each level defined at compile-time. For dynamic tags, depth is not defined while types can change during run-time and they can be of different type event for sibling tags.

Level: 0 1 2
Type: int string float
Static Tag #1 77 "x" 12.3
Static Tag #2 8 "y"  
Static Tag #3 10    

 

Level: 0 1 2
Dynamic Tag #1 8 "x" 12.3
Dynamic Tag #2 8 7.9  
Dynamic Tag #3 "y" 8  

Production of an item is done in two phases:

  1. Reserving place in the queue
  2. Populating item with actual payload

Reserving place ensures that relative order of related items will be respected, so that time needed to populate item with actual value does not cause unwanted reorderings.

Only populated items will be served to the consumers. All related items ordered after the item will wait for it to be populated, served and consumed before they are offered to the consumer, even if they are populated.

These two phases can be merged and item can be populated at reservation time, but the order will still be respected and the new item will not be offered unless there are no other related items that have to be consumed before it.

Like production, consumption is also done in two phases:

  1. Obtaining populated item
  2. Releasing item from the queue

The two phase process ensures that consumers will not be served with related items concurrently. In the first phase, consumer asks for an available time. After the item is processed, consumer has to notify queue that processing is done, so the next available item with related tag, if any, can be offered.

While the order of related items is respected, no such guarantee exists for unrelated items, which is the whole point of the exercise. :) There is still some control over the order in which available, but unrelated items, are served the consumers:

  1. FIFO - the first item made available gets served first
  2. LIFO - the last item made available gets served first
  3. Priority - available item with highest user-defined priority gets served first

Requirements

Forque library requires support for C++20 from the compiler. Specifically, support for coroutines and concepts. Currently, it is only working with Microsoft Visual C++ 14 compiler.

Using the Forque

Forque is implemented by frq::forque class template which is parameterized by type of the items stored in the queue, type of tags used for scheduling, desired runque type and allocator.

template<typename Ty,
         runlike Runque,
         taglike Tag,
         typename Alloc = std::allocator<Ty>>
class forque;

Runque and Tag template parameters are constrained by runlike and taglike concepts.

frq::forque has a rather simple interface that allows user to do the following:

  1. Reserve place in queue for already produced item with specific tag:
    template<taglike Tag>
    task<> reserve(Tag const& tag, value_type const& value); // (a)
    
    template<taglike Tag>
    task<> reserve(Tag const& tag, value_type&& value);      // (b)
  2. Reserve place in queue for an item with specific tag that will be produced later:
    template<taglike Tag>
    task<reservation_type> reserve(Tag const& tag);          // (c)
  3. Get access to next ready item in runqueue:
    task<retainment_type> get() noexcept;
  4. Stop acceptance of new reservations and processing of existing items:
    task<> interrupt() noexcept;

Producing Items

If item’s value is already known, it can be marked with desired tag and be placed in forque using reserve overloads (a) or (b). Item will be put in ready queue whenever its dependencies are processed. On the other hand, if item’s value is not yet known, but the place has to be reserved to keep desired order of processing when it comes to item’s dependencies, overload (c) can be used. It returns object of reservation<T> type which represents this reservation. Calling release() on this type will set desired value and it will allow item to be put in ready queue. If release() is not called, all dependent items will be stuck in forque and they will not be scheduled for processing.

Consuming Items

get()will return only when there are available items in runqueue, or if the processing is interrupted by throwing interrupted exception. Item is not considered consumed after calling get(). Instead instance of retainment<T> type is returned. Calling value() will yield item’s value and finalize() will mark item as consumed and forque can proceed with processing dependent items. finalize() will not be called automatically by the destructor of retainment<T>, so it’s user’s responsibility to invoke it even in case of exception. If finalize() is not invoked, all dependent items will be stuck in forque and will never be scheduled for processing.

Stopping the Item Processing

Invoking interrupt() will immediately awake any consumers waiting for items and throw interrupted exception within their context. Any subsequent call to reserve() or get() will also result in interrupted exception.

Tags

Library offers the following choice of tag types:

  1. Static tags - frq::stag class template, taking number of elements the tag has, its size, and their types
  2. Dynamic tag - frq::dtag class template, taking only allocator type

All tag types have to satisfy taglike concept which is defined as:

template<typename Ty>
concept taglike = requires(Ty t) {
  typename Ty::size_type;
  typename Ty::storage_type;

  {std::size_t{typename Ty::size_type{}}};

  { t.values() } noexcept -> std::convertible_to<typename Ty::storage_type>;
  { t.size() } noexcept -> std::same_as<typename Ty::size_type>;
};

where

  • size_type is type that can store number of values in the tag and it can be converted to std::size_t without narrowing,
  • storage_type is the type can store all values in the tag,
  • size is a member function that returns number of values in the tag, and
  • values is a member function returning all the values currently stored in tag.

Static Tags

Static tag has its structure, size and type of values at each level, defined at the compile time. Static tag owns values stored in them which means that copying/destroying tag will also copy/destroy stored values.

frq::stag class template represent static tag.

template<std::uint8_t Size, typename... Tys>
class stag;

Size template parameter specifies how many values from the Tys type list will be populated, so it defines "level" of the tag. List of types have to be the same for all tags used by the same queue, no matter the tag level:

Level: 0 1 2
stag<3, int, string, float> 77 "x" 12.3
stag<2, int, string, float> 8 "y"  
stag<1, int, string, float> 10    

frq::stag can be constructed from tuple of fitting size containing defined types, or providing the list of arguments to the constructor which will be used to initialize corresponding values in tag:

template<typename... Txs>
constexpr inline stag(construct_tag_default_t, Txs&&... args);

constexpr inline stag(storage_type const& values);
constexpr inline stag(storage_type&& values);

Size of the tag can be obtained by calling size:size(); tuple containing all values in the tag is returned by values(); and value at the highest available level is returned by key().

storage_type const& values() const noexcept;
size_type size() const noexcept;
auto key() const noexcept;

frq::stag_t type alias provides way to defined stag type at the highest level for specified list of type:

template<typename... Tys>
using stag_t = /* ... */;

Dynamic Tags

Dynamic tags have no restriction on size or type of values at each level. Tag size can increase in run-time and types of values can differ for each level and even for the children of the same parent:

Level: 0 1 1
Dynamic Tag #1 8 "x" 12.3
Dynamic Tag #2 8 7.9  
Dynamic Tag #3 "y" 8  

This is achieved through type erasure. Types that are stored in dynamic tag have to support hashing and equality comparison. Values are shared by the dynamic tags, so tags store only references to the values unlike the static tags.

frq::dtag class template is only parameterized by allocator used for type erasure purposes:

template<typename Alloc = std::allocator<dtag_node>>
class dtag;

Values stored in dynamic tags are wrapped by frq::dtag_node abstract type that does type erasure. frq::dtag_node provides interface for hashing, equality comparison and string formatting of stored values.

class dtag_node {
public:
  virtual ~dtag_node() {
  }

  virtual std::size_t hash() const noexcept = 0;
  virtual bool equal(dtag_node const& other) const noexcept = 0;
  virtual std::string get_string() const = 0;
};

User can interact with values through frq::dtag_value type, which is a regular type providing access to underlying node storing actual value.

class dtag_value {
public:
  dtag_node_ptr::element_type const& node() const;
  std::size_t hash() const noexcept;
  std::string get_string();

  bool operator==(dtag_value const& rhs) const noexcept;
  bool operator!=(dtag_value const& rhs) const noexcept;
};

frq::dtag can be constructed from sequence of dtag_values or by providing list of arguments to the constructor which will initialize corresponding values in the tag. Hashing operation and equality comparison can be provided which is going to be used by constructed dtag_node.

template<tag_iterator Iter>
dtag(allocator_type const& alloc, Iter first, Iter last);

template<tag_iterator Iter>
dtag(Iter first, Iter last);

template<typename HashCmp, typename... Tys>
dtag(allocator_type const& alloc, HashCmp const& hash_cmp, Tys&&... args);

template<typename... Tys>
dtag(construct_tag_alloc_t, allocator_type const& alloc, Tys&&... args);

template<typename HashCmp, typename... Tys>
dtag(construct_tag_hash_cmp_t, HashCmp const& hash_cmp, Tys&&... args);

template<typename... Tys>
dtag(construct_tag_default_t /*unused*/, Tys&&... args);

Dynamic tags offer the similar interface as static tags. They can provide: size(); vector of tag_values containing references to values in the tag: values(); and tag_value at the highest available level: key(). Additional accessor is provided that allows user to get tuple of values stored in tag: pack(). Number and types of template arguments provided to pack function template must match number and types of values stored in the tag, otherwise behavior is undefined.

storage_type const& values() const noexcept;
size_type size() const noexcept;
template<typename... Tys>
auto pack() const;
auto key() const noexcept;

frq::default_hash_compare type, that is used in case user type is not provided during tag construction, forwards hash computation to std::hash<T> and equality comparison to std::equal_to<T>.

struct default_hash_compare {
  template<typename Ty>
  std::size_t hash(Ty const& value) const noexcept;

  template<typename Ty>
  bool equal_to(Ty const& left, Ty const& right) const noexcept;
};

Runques

Runque represents queue of items ready to be served to consumers for processing. Order in which ready items are consumed can be configured by the user. Three options are available:

  1. FIFO - the first item made available gets served first
  2. LIFO - the last item made available gets served first
  3. Priority - available item with highest user-defined priority gets served first

frq::make_runque_t metafunction offers a way to configure desired runque:

template<typename Order,
         typename Mtm,
         typename Ty,
         typename Alloc,
         typename... Rest>
using make_runque_t = /* ... */;
  • Order defines in which order ready items are consumed. One of the following options are available:
    1. fifo_order
    2. lifo_order
    3. priority_order
  • Mtm defines multithreading model. Only coro_thread_model is implemented.
  • Ty is type of items stored in runque.
  • Alloc is obviously allocator used by queue itself.
  • Rest can be used for passing additional parameters to desired runque type. In case of priority_order, it defines LessThen operation passed to underlying priority queue.

Ty parameter is not the same as user type on which forque operates. It is a wrapping type that allows consumer to control item’s lifetime. The type also provides a way to notify forque that item has been processed so it can ready next related item, if available.

Type returned by make_runque_t metafunction will satisfy runlike concept which is defined as:

template<typename Ty>
concept runlike = requires(Ty s) {
  typename Ty::value_type;
  requires runnable<typename Ty::value_type>;
  typename Ty::get_type;

  { s.get() } ->std::same_as<typename Ty::get_type>;
  { s.put(std::declval<typename Ty::value_type&&>()) };
};

where

  • value_type is type of items stored in the queue which needs to satisfy runnable concept,
  • get_type is return type of get() member function that yields next available item,
  • get is a function returning next available item, and
  • put is function adding item to the queue.

runnable concept is rather simple. It only requires non-throwing move constructor:

template<typename Ty>
concept runnable =
  std::is_nothrow_move_constructible_v<Ty> && !std::is_reference_v<Ty>;

User’s only responsibility is to configure desired runque type and to provide it to forque. User will not interact with runque directly.

Example

An example of application use of forque can be found in ./src/app/ directory. It is using dynamic tags with several producers and simple consumers that just outputs consumed items to the console.

The example has a rather simple thread pool implementation that provides context for the execution of consumer’s and producer’s coroutines, but it is out of the scope of this article.

The first thing we do is select tag type and configure runque and forque types:

using tag_type = frq::dtag<>; // dynamic tag

// type returned by 'reserve' call
using reservation_type = frq::reservation<item>;

// type returned by 'get' call
using retainment_type = frq::retainment<item>;

using runque_type = frq::make_runque_t<frq::fifo_order,
                                       frq::coro_thread_model,
                                       retainment_type,
                                       std::allocator<retainment_type>>;

using queue_type = frq::forque<item, runque_type, tag_type>;

generate_tag function gives an example how to generate dynamic tag of arbitrary size:

auto tag_size = tag_size_dist(rng);
while (tag_size-- > 0) {
  auto node = frq::make_dtag_node<int>(std::allocator<frq::dtag_node>{},
                                       frq::default_hash_compare{},
                                       tag_value_dist(rng));

  tag_values.push_back(std::move(node));
}

return tag_type{begin(tag_values), end(tag_values)};

produce function is a coroutine that produces items in the two phases:

auto tag = generate_tag(rng);

// phase 1
auto item = co_await queue.reserve(tag);

// ...
co_await p.yield();
// ...

// phase 2
co_await item.release({tag, value});

In the first phase, we reserve the place for the item in queue. In the second phase, we populate the item with the actual value which will make item available for consumption.

Call to yield actually puts the rest of the coroutine back at the end of the thread pool’s queue. Since production is done in a loop without any waiting, this gives consumers opportunity to start consuming items before we end the loop.

Each producer generates certain number of items and after the last producer has finished the production, it initiates shutdown of the forque:

if (--producers == 0) {
  co_await queue.interrupt();
}

consume function is a coroutine that consumes ready items from the forque:

try {
  // phase 1
  auto item = co_await queue.get();

  // ...
  co_await p.yield();
  // ...

  // phase 2
  co_await item.finalize();
}
catch (frq::interrupted&) {
  break;
}

As with producer, there are two phases. In the first phase, we wait and obtain item from the queue and in the second phase, after the processing is done, item is released.

yield call in the middle simulates asynchronous processing by putting the rest of the coroutine at the end of thread pool’s queue.

Processing is wrapped in try/catch block so it can abort consumption coroutione after the shutdown of the forque.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

Mladen Janković
Software Developer
Serbia Serbia
No Biography provided

Comments and Discussions

 
-- There are no messages in this forum --