163 lines
4.5 KiB
C
163 lines
4.5 KiB
C
|
#pragma once
|
||
|
|
||
|
#include <c10/core/Stream.h>
|
||
|
#include <c10/xpu/XPUFunctions.h>
|
||
|
|
||
|
namespace c10::xpu {
|
||
|
|
||
|
/*
|
||
|
* Note [Stream Management]
|
||
|
*
|
||
|
* An XPUStream is an abstraction of an actual SYCL queue in which SYCL kernel
|
||
|
* can execute. Currently, there are several pools per device to manage SYCL
|
||
|
* queue, and a device's pool is lazily created.
|
||
|
*
|
||
|
* There are two pools per device. The first pool contains "normal priority"
|
||
|
* queues. The second pool is the "high priority" queues. There are 32 queues in
|
||
|
* per pool per device, and when a queue is requested one of these queues is
|
||
|
* returned round-robin. That is, the first queue requested is at index 0, the
|
||
|
* second at index 1... to index 31, then index 0 again.
|
||
|
*
|
||
|
* This means that if 33 queues are requested, the first and last queues
|
||
|
* requested are actually the same queue (under the covers) and kernels enqueued
|
||
|
* on them cannot run concurrently.
|
||
|
*
|
||
|
* It is safe to enqueue a kernel on the same queue from two different
|
||
|
* threads as the SYCL specification described.
|
||
|
*/
|
||
|
|
||
|
static constexpr int max_compile_time_stream_priorities = 2;
|
||
|
|
||
|
/*
|
||
|
* This serves as a wrapper around c10::Stream and acts as a representation for
|
||
|
* a SYCL queue. On each device, a SYCL queue pool consists of kStreamsPerPool
|
||
|
* queues, and you can access a particular queue by its index. The index is
|
||
|
* extracted from XPUStream.id().
|
||
|
*/
|
||
|
class C10_XPU_API XPUStream {
|
||
|
public:
|
||
|
enum Unchecked { UNCHECKED };
|
||
|
|
||
|
// Construct a XPUStream from a Stream. This construction is checked, and
|
||
|
// will raise an error if the Stream is not, in fact, a XPU stream.
|
||
|
explicit XPUStream(Stream stream) : stream_(stream) {
|
||
|
TORCH_CHECK(stream_.device_type() == DeviceType::XPU);
|
||
|
}
|
||
|
|
||
|
// Construct a XPUStream from a Stream with no error checking.
|
||
|
explicit XPUStream(Unchecked, Stream stream) : stream_(stream) {}
|
||
|
|
||
|
bool operator==(const XPUStream& other) const noexcept {
|
||
|
return unwrap() == other.unwrap();
|
||
|
}
|
||
|
|
||
|
bool operator!=(const XPUStream& other) const noexcept {
|
||
|
return unwrap() != other.unwrap();
|
||
|
}
|
||
|
|
||
|
operator sycl::queue&() const {
|
||
|
return queue();
|
||
|
}
|
||
|
|
||
|
operator Stream() const {
|
||
|
return unwrap();
|
||
|
}
|
||
|
|
||
|
DeviceType device_type() const {
|
||
|
return DeviceType::XPU;
|
||
|
}
|
||
|
|
||
|
DeviceIndex device_index() const {
|
||
|
return stream_.device_index();
|
||
|
}
|
||
|
|
||
|
Device device() const {
|
||
|
return Device(DeviceType::XPU, device_index());
|
||
|
}
|
||
|
|
||
|
// Return the stream ID corresponding to this particular stream. StreamId is
|
||
|
/// a int64_t representation generated by its type and index.
|
||
|
StreamId id() const {
|
||
|
return stream_.id();
|
||
|
}
|
||
|
|
||
|
bool query() const {
|
||
|
return queue().ext_oneapi_empty();
|
||
|
}
|
||
|
|
||
|
void synchronize() const {
|
||
|
queue().wait_and_throw();
|
||
|
}
|
||
|
|
||
|
int priority() const;
|
||
|
|
||
|
// Explicit conversion to sycl::queue&.
|
||
|
sycl::queue& queue() const;
|
||
|
|
||
|
Stream unwrap() const {
|
||
|
return stream_;
|
||
|
}
|
||
|
|
||
|
struct c10::StreamData3 pack3() const {
|
||
|
return stream_.pack3();
|
||
|
}
|
||
|
|
||
|
static XPUStream unpack3(
|
||
|
StreamId stream_id,
|
||
|
DeviceIndex device_index,
|
||
|
DeviceType device_type) {
|
||
|
return XPUStream(Stream::unpack3(stream_id, device_index, device_type));
|
||
|
}
|
||
|
|
||
|
static std::tuple<int, int> priority_range() {
|
||
|
return std::make_tuple(0, -max_compile_time_stream_priorities + 1);
|
||
|
}
|
||
|
|
||
|
private:
|
||
|
Stream stream_;
|
||
|
};
|
||
|
|
||
|
/**
|
||
|
* Get a stream from the pool in a round-robin fashion.
|
||
|
*
|
||
|
* You can request a stream from the high priority pool by setting
|
||
|
* isHighPriority to true, or a priority value for a specific device by setting
|
||
|
* device.
|
||
|
*/
|
||
|
C10_XPU_API XPUStream
|
||
|
getStreamFromPool(const bool isHighPriority = false, DeviceIndex device = -1);
|
||
|
// The priority number lower, the priority higher.
|
||
|
C10_XPU_API XPUStream
|
||
|
getStreamFromPool(const int priority, DeviceIndex device = -1);
|
||
|
|
||
|
/**
|
||
|
* Get the current XPU stream, for the passed XPU device, or for the current
|
||
|
* device if no device index is passed.
|
||
|
*/
|
||
|
C10_XPU_API XPUStream getCurrentXPUStream(DeviceIndex device = -1);
|
||
|
|
||
|
/**
|
||
|
* Set the current stream on the device of the passed in stream to be the passed
|
||
|
* in stream.
|
||
|
*/
|
||
|
C10_XPU_API void setCurrentXPUStream(XPUStream stream);
|
||
|
|
||
|
C10_XPU_API std::ostream& operator<<(std::ostream& stream, const XPUStream& s);
|
||
|
|
||
|
/**
|
||
|
* Block all reserved SYCL queues in the stream pools on the device, and wait
|
||
|
* for their synchronizations.
|
||
|
*/
|
||
|
C10_XPU_API void syncStreamsOnDevice(DeviceIndex device = -1);
|
||
|
|
||
|
} // namespace c10::xpu
|
||
|
|
||
|
namespace std {
|
||
|
template <>
|
||
|
struct hash<c10::xpu::XPUStream> {
|
||
|
size_t operator()(c10::xpu::XPUStream s) const noexcept {
|
||
|
return std::hash<c10::Stream>{}(s.unwrap());
|
||
|
}
|
||
|
};
|
||
|
} // namespace std
|