XPUStream.h 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. #pragma once
  2. #include <c10/core/Stream.h>
  3. #include <c10/core/impl/GPUTrace.h>
  4. #include <c10/xpu/XPUFunctions.h>
  5. namespace c10::xpu {
  6. /*
  7. * Note [Stream Management]
  8. *
  9. * An XPUStream is an abstraction of an actual SYCL queue in which SYCL kernel
  10. * can execute. Currently, there are several pools per device to manage SYCL
  11. * queue, and a device's pool is lazily created.
  12. *
  13. * There are two pools per device. The first pool contains "normal priority"
  14. * queues. The second pool is the "high priority" queues. There are 32 queues in
  15. * per pool per device, and when a queue is requested one of these queues is
  16. * returned round-robin. That is, the first queue requested is at index 0, the
  17. * second at index 1... to index 31, then index 0 again.
  18. *
  19. * This means that if 33 queues are requested, the first and last queues
  20. * requested are actually the same queue (under the covers) and kernels enqueued
  21. * on them cannot run concurrently.
  22. *
  23. * It is safe to enqueue a kernel on the same queue from two different
  24. * threads as the SYCL specification described.
  25. */
  26. static constexpr int max_compile_time_stream_priorities = 3;
  27. /*
  28. * This serves as a wrapper around c10::Stream and acts as a representation for
  29. * a SYCL queue, which allows asynchronous execution of XPU tasks.
  30. */
  31. class C10_XPU_API XPUStream {
  32. public:
  33. enum Unchecked { UNCHECKED };
  34. /// Construct a XPUStream from a Stream. This construction is checked, and
  35. /// will raise an error if the Stream is not, in fact, a XPU stream.
  36. explicit XPUStream(Stream stream) : stream_(stream) {
  37. TORCH_CHECK(stream_.device_type() == DeviceType::XPU);
  38. }
  39. /// Construct a XPUStream from a Stream with no error checking.
  40. explicit XPUStream(Unchecked, Stream stream) : stream_(stream) {}
  41. bool operator==(const XPUStream& other) const noexcept {
  42. return unwrap() == other.unwrap();
  43. }
  44. bool operator!=(const XPUStream& other) const noexcept {
  45. return unwrap() != other.unwrap();
  46. }
  47. /// Implicit conversion to sycl::queue&.
  48. operator sycl::queue&() const {
  49. return queue();
  50. }
  51. /// Implicit conversion to sycl::queue*.
  52. operator sycl::queue*() const {
  53. return &queue();
  54. }
  55. /// Implicit conversion to Stream (a.k.a., forget that the stream is a
  56. /// XPU stream).
  57. operator Stream() const {
  58. return unwrap();
  59. }
  60. /// Get the XPU device type that this stream is associated with.
  61. DeviceType device_type() const {
  62. return DeviceType::XPU;
  63. }
  64. /// Get the XPU device index that this stream is associated with.
  65. DeviceIndex device_index() const {
  66. return stream_.device_index();
  67. }
  68. /// Get the full Device that this stream is associated with. The Device is
  69. /// guaranteed to be a XPU device.
  70. Device device() const {
  71. return Device(DeviceType::XPU, device_index());
  72. }
  73. /// Return the stream ID corresponding to this particular stream. StreamId is
  74. /// a int64_t representation generated by its type and index.
  75. StreamId id() const {
  76. return stream_.id();
  77. }
  78. /// Return true if all enqueued tasks in this stream have been completed,
  79. /// otherwise return false.
  80. bool query() const {
  81. return queue().ext_oneapi_empty();
  82. }
  83. /// Performs a blocking wait for the completion of all enqueued tasks in this
  84. /// stream.
  85. void synchronize() const {
  86. queue().wait_and_throw();
  87. const c10::impl::PyInterpreter* interp = c10::impl::GPUTrace::get_trace();
  88. if (C10_UNLIKELY(interp)) {
  89. (*interp)->trace_gpu_stream_synchronization(
  90. c10::kXPU, reinterpret_cast<uintptr_t>(&queue()));
  91. }
  92. }
  93. /// Return the priority that this stream is associated with. Lower numbers
  94. /// represent higher priority.
  95. int priority() const;
  96. /// Explicit conversion to sycl::queue&.
  97. sycl::queue& queue() const;
  98. /// Explicit conversion to Stream.
  99. Stream unwrap() const {
  100. return stream_;
  101. }
  102. /// Reversibly pack a XPUStream into a struct representation. The XPUStream
  103. /// can be unpacked using unpack3().
  104. struct c10::StreamData3 pack3() const {
  105. return stream_.pack3();
  106. }
  107. /// Unpack a XPUStream from the 3 fields generated by pack3().
  108. static XPUStream unpack3(
  109. StreamId stream_id,
  110. DeviceIndex device_index,
  111. DeviceType device_type) {
  112. return XPUStream(Stream::unpack3(stream_id, device_index, device_type));
  113. }
  114. /// Return the range of priority **supported by PyTorch**.
  115. static std::tuple<int, int> priority_range() {
  116. // See Note [XPU Stream priorities]
  117. return std::make_tuple(1, -max_compile_time_stream_priorities + 2);
  118. }
  119. private:
  120. Stream stream_;
  121. };
  122. /**
  123. * Get a stream from the pool in a round-robin fashion.
  124. *
  125. * You can request a stream from the highest priority pool by setting
  126. * isHighPriority to true for a specific device.
  127. */
  128. C10_XPU_API XPUStream
  129. getStreamFromPool(const bool isHighPriority = false, DeviceIndex device = -1);
  130. /**
  131. * Get a stream from the pool in a round-robin fashion.
  132. *
  133. * You can request a stream by setting a priority value for a specific device.
  134. * The priority number lower, the priority higher.
  135. */
  136. C10_XPU_API XPUStream
  137. getStreamFromPool(const int priority, DeviceIndex device = -1);
  138. /**
  139. * Get an XPUStream from an external SYCL queue.
  140. *
  141. * This function allows interoperability with other libraries by enabling
  142. * the use of an external SYCL queue that was not created by PyTorch. This
  143. * can be useful for data exchange or other operations where integration
  144. * with non-PyTorch queues is required.
  145. *
  146. * NOTE: It is the user's responsibility to ensure that the referenced SYCL
  147. * queue remains alive while the corresponding XPUStream, or any c10::Stream
  148. * derived from it, is in use. The different SYCL queue pointers will result in
  149. * distinct XPUStream instances, even if the SYCL queues they dereference are
  150. * equivalent.
  151. */
  152. C10_XPU_API XPUStream
  153. getStreamFromExternal(sycl::queue* ext_queue, DeviceIndex device_index);
  154. /**
  155. * Get the current XPU stream, for the passed XPU device, or for the current
  156. * device if no device index is passed.
  157. */
  158. C10_XPU_API XPUStream getCurrentXPUStream(DeviceIndex device = -1);
  159. /**
  160. * Set the current stream on the device of the passed in stream to be the passed
  161. * in stream.
  162. */
  163. C10_XPU_API void setCurrentXPUStream(XPUStream stream);
  164. C10_XPU_API std::ostream& operator<<(std::ostream& stream, const XPUStream& s);
  165. /**
  166. * Block all reserved SYCL queues in the stream pools on the device, and wait
  167. * for their synchronizations.
  168. */
  169. C10_XPU_API void syncStreamsOnDevice(DeviceIndex device = -1);
  170. } // namespace c10::xpu
  171. namespace std {
  172. template <>
  173. struct hash<c10::xpu::XPUStream> {
  174. size_t operator()(c10::xpu::XPUStream s) const noexcept {
  175. return std::hash<c10::Stream>{}(s.unwrap());
  176. }
  177. };
  178. } // namespace std