1use std::fmt;
2use std::fs::File;
3use std::future::Future;
4#[cfg(feature = "multipart")]
5use std::io::Cursor;
6use std::io::{self, Read};
7use std::mem;
8use std::ptr;
9
10use bytes::buf::UninitSlice;
11use bytes::Bytes;
12use futures_channel::mpsc;
13
14use crate::async_impl;
15
16#[derive(Debug)]
24pub struct Body {
25 kind: Kind,
26}
27
28impl Body {
29 pub fn new<R: Read + Send + 'static>(reader: R) -> Body {
62 Body {
63 kind: Kind::Reader(Box::from(reader), None),
64 }
65 }
66
67 pub fn sized<R: Read + Send + 'static>(reader: R, len: u64) -> Body {
82 Body {
83 kind: Kind::Reader(Box::from(reader), Some(len)),
84 }
85 }
86
87 pub fn as_bytes(&self) -> Option<&[u8]> {
90 match self.kind {
91 Kind::Reader(_, _) => None,
92 Kind::Bytes(ref bytes) => Some(bytes.as_ref()),
93 }
94 }
95
96 pub fn buffer(&mut self) -> Result<&[u8], crate::Error> {
103 match self.kind {
104 Kind::Reader(ref mut reader, maybe_len) => {
105 let mut bytes = if let Some(len) = maybe_len {
106 Vec::with_capacity(len as usize)
107 } else {
108 Vec::new()
109 };
110 io::copy(reader, &mut bytes).map_err(crate::error::builder)?;
111 self.kind = Kind::Bytes(bytes.into());
112 self.buffer()
113 }
114 Kind::Bytes(ref bytes) => Ok(bytes.as_ref()),
115 }
116 }
117
118 #[cfg(feature = "multipart")]
119 pub(crate) fn len(&self) -> Option<u64> {
120 match self.kind {
121 Kind::Reader(_, len) => len,
122 Kind::Bytes(ref bytes) => Some(bytes.len() as u64),
123 }
124 }
125
126 #[cfg(feature = "multipart")]
127 pub(crate) fn into_reader(self) -> Reader {
128 match self.kind {
129 Kind::Reader(r, _) => Reader::Reader(r),
130 Kind::Bytes(b) => Reader::Bytes(Cursor::new(b)),
131 }
132 }
133
134 pub(crate) fn into_async(self) -> (Option<Sender>, async_impl::Body, Option<u64>) {
135 match self.kind {
136 Kind::Reader(read, len) => {
137 let (tx, rx) = mpsc::channel(0);
138 let tx = Sender {
139 body: (read, len),
140 tx,
141 };
142 (Some(tx), async_impl::Body::stream(rx), len)
143 }
144 Kind::Bytes(chunk) => {
145 let len = chunk.len() as u64;
146 (None, async_impl::Body::reusable(chunk), Some(len))
147 }
148 }
149 }
150
151 pub(crate) fn try_clone(&self) -> Option<Body> {
152 self.kind.try_clone().map(|kind| Body { kind })
153 }
154}
155
156enum Kind {
157 Reader(Box<dyn Read + Send>, Option<u64>),
158 Bytes(Bytes),
159}
160
161impl Kind {
162 fn try_clone(&self) -> Option<Kind> {
163 match self {
164 Kind::Reader(..) => None,
165 Kind::Bytes(v) => Some(Kind::Bytes(v.clone())),
166 }
167 }
168}
169
170impl From<Vec<u8>> for Body {
171 #[inline]
172 fn from(v: Vec<u8>) -> Body {
173 Body {
174 kind: Kind::Bytes(v.into()),
175 }
176 }
177}
178
179impl From<String> for Body {
180 #[inline]
181 fn from(s: String) -> Body {
182 s.into_bytes().into()
183 }
184}
185
186impl From<&'static [u8]> for Body {
187 #[inline]
188 fn from(s: &'static [u8]) -> Body {
189 Body {
190 kind: Kind::Bytes(Bytes::from_static(s)),
191 }
192 }
193}
194
195impl From<&'static str> for Body {
196 #[inline]
197 fn from(s: &'static str) -> Body {
198 s.as_bytes().into()
199 }
200}
201
202impl From<File> for Body {
203 #[inline]
204 fn from(f: File) -> Body {
205 let len = f.metadata().map(|m| m.len()).ok();
206 Body {
207 kind: Kind::Reader(Box::new(f), len),
208 }
209 }
210}
211impl From<Bytes> for Body {
212 #[inline]
213 fn from(b: Bytes) -> Body {
214 Body {
215 kind: Kind::Bytes(b),
216 }
217 }
218}
219
220impl fmt::Debug for Kind {
221 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
222 match *self {
223 Kind::Reader(_, ref v) => f
224 .debug_struct("Reader")
225 .field("length", &DebugLength(v))
226 .finish(),
227 Kind::Bytes(ref v) => fmt::Debug::fmt(v, f),
228 }
229 }
230}
231
232struct DebugLength<'a>(&'a Option<u64>);
233
234impl<'a> fmt::Debug for DebugLength<'a> {
235 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
236 match *self.0 {
237 Some(ref len) => fmt::Debug::fmt(len, f),
238 None => f.write_str("Unknown"),
239 }
240 }
241}
242
243#[cfg(feature = "multipart")]
244pub(crate) enum Reader {
245 Reader(Box<dyn Read + Send>),
246 Bytes(Cursor<Bytes>),
247}
248
249#[cfg(feature = "multipart")]
250impl Read for Reader {
251 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
252 match *self {
253 Reader::Reader(ref mut rdr) => rdr.read(buf),
254 Reader::Bytes(ref mut rdr) => rdr.read(buf),
255 }
256 }
257}
258
259pub(crate) struct Sender {
260 body: (Box<dyn Read + Send>, Option<u64>),
261 tx: mpsc::Sender<Result<Bytes, Abort>>,
262}
263
264#[derive(Debug)]
265struct Abort;
266
267impl fmt::Display for Abort {
268 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
269 f.write_str("abort request body")
270 }
271}
272
273impl std::error::Error for Abort {}
274
275async fn send_future(sender: Sender) -> Result<(), crate::Error> {
276 use bytes::{BufMut, BytesMut};
277 use futures_util::SinkExt;
278 use std::cmp;
279
280 let con_len = sender.body.1;
281 let cap = cmp::min(sender.body.1.unwrap_or(8192), 8192);
282 let mut written = 0;
283 let mut buf = BytesMut::with_capacity(cap as usize);
284 let mut body = sender.body.0;
285 let mut tx = Some(sender.tx);
287
288 loop {
289 if Some(written) == con_len {
290 return Ok(());
292 }
293
294 if buf.is_empty() {
308 if buf.remaining_mut() == 0 {
309 buf.reserve(8192);
310 let uninit = buf.chunk_mut();
312 unsafe {
313 ptr::write_bytes(uninit.as_mut_ptr(), 0, uninit.len());
314 }
315 }
316
317 let bytes = unsafe { mem::transmute::<&mut UninitSlice, &mut [u8]>(buf.chunk_mut()) };
318 match body.read(bytes) {
319 Ok(0) => {
320 return Ok(());
323 }
324 Ok(n) => unsafe {
325 buf.advance_mut(n);
326 },
327 Err(e) => {
328 let _ = tx
329 .take()
330 .expect("tx only taken on error")
331 .clone()
332 .try_send(Err(Abort));
333 return Err(crate::error::body(e));
334 }
335 }
336 }
337
338 let buf_len = buf.len() as u64;
342 tx.as_mut()
343 .expect("tx only taken on error")
344 .send(Ok(buf.split().freeze()))
345 .await
346 .map_err(crate::error::body)?;
347
348 written += buf_len;
349 }
350}
351
352impl Sender {
353 pub(crate) fn send(self) -> impl Future<Output = Result<(), crate::Error>> {
356 send_future(self)
357 }
358}
359
360#[cfg(test)]
362pub(crate) fn read_to_string(mut body: Body) -> io::Result<String> {
363 let mut s = String::new();
364 match body.kind {
365 Kind::Reader(ref mut reader, _) => reader.read_to_string(&mut s),
366 Kind::Bytes(ref mut bytes) => (&**bytes).read_to_string(&mut s),
367 }
368 .map(|_| s)
369}