reqwest/blocking/
body.rs

1use std::fmt;
2use std::fs::File;
3use std::future::Future;
4#[cfg(feature = "multipart")]
5use std::io::Cursor;
6use std::io::{self, Read};
7use std::mem;
8use std::ptr;
9
10use bytes::buf::UninitSlice;
11use bytes::Bytes;
12use futures_channel::mpsc;
13
14use crate::async_impl;
15
16/// The body of a `Request`.
17///
18/// In most cases, this is not needed directly, as the
19/// [`RequestBuilder.body`][builder] method uses `Into<Body>`, which allows
20/// passing many things (like a string or vector of bytes).
21///
22/// [builder]: ./struct.RequestBuilder.html#method.body
23#[derive(Debug)]
24pub struct Body {
25    kind: Kind,
26}
27
28impl Body {
29    /// Instantiate a `Body` from a reader.
30    ///
31    /// # Note
32    ///
33    /// While allowing for many types to be used, these bodies do not have
34    /// a way to reset to the beginning and be reused. This means that when
35    /// encountering a 307 or 308 status code, instead of repeating the
36    /// request at the new location, the `Response` will be returned with
37    /// the redirect status code set.
38    ///
39    /// ```rust
40    /// # use std::fs::File;
41    /// # use reqwest::blocking::Body;
42    /// # fn run() -> Result<(), Box<dyn std::error::Error>> {
43    /// let file = File::open("national_secrets.txt")?;
44    /// let body = Body::new(file);
45    /// # Ok(())
46    /// # }
47    /// ```
48    ///
49    /// If you have a set of bytes, like `String` or `Vec<u8>`, using the
50    /// `From` implementations for `Body` will store the data in a manner
51    /// it can be reused.
52    ///
53    /// ```rust
54    /// # use reqwest::blocking::Body;
55    /// # fn run() -> Result<(), Box<dyn std::error::Error>> {
56    /// let s = "A stringy body";
57    /// let body = Body::from(s);
58    /// # Ok(())
59    /// # }
60    /// ```
61    pub fn new<R: Read + Send + 'static>(reader: R) -> Body {
62        Body {
63            kind: Kind::Reader(Box::from(reader), None),
64        }
65    }
66
67    /// Create a `Body` from a `Read` where the size is known in advance
68    /// but the data should not be fully loaded into memory. This will
69    /// set the `Content-Length` header and stream from the `Read`.
70    ///
71    /// ```rust
72    /// # use std::fs::File;
73    /// # use reqwest::blocking::Body;
74    /// # fn run() -> Result<(), Box<dyn std::error::Error>> {
75    /// let file = File::open("a_large_file.txt")?;
76    /// let file_size = file.metadata()?.len();
77    /// let body = Body::sized(file, file_size);
78    /// # Ok(())
79    /// # }
80    /// ```
81    pub fn sized<R: Read + Send + 'static>(reader: R, len: u64) -> Body {
82        Body {
83            kind: Kind::Reader(Box::from(reader), Some(len)),
84        }
85    }
86
87    /// Returns the body as a byte slice if the body is already buffered in
88    /// memory. For streamed requests this method returns `None`.
89    pub fn as_bytes(&self) -> Option<&[u8]> {
90        match self.kind {
91            Kind::Reader(_, _) => None,
92            Kind::Bytes(ref bytes) => Some(bytes.as_ref()),
93        }
94    }
95
96    /// Converts streamed requests to their buffered equivalent and
97    /// returns a reference to the buffer. If the request is already
98    /// buffered, this has no effect.
99    ///
100    /// Be aware that for large requests this method is expensive
101    /// and may cause your program to run out of memory.
102    pub fn buffer(&mut self) -> Result<&[u8], crate::Error> {
103        match self.kind {
104            Kind::Reader(ref mut reader, maybe_len) => {
105                let mut bytes = if let Some(len) = maybe_len {
106                    Vec::with_capacity(len as usize)
107                } else {
108                    Vec::new()
109                };
110                io::copy(reader, &mut bytes).map_err(crate::error::builder)?;
111                self.kind = Kind::Bytes(bytes.into());
112                self.buffer()
113            }
114            Kind::Bytes(ref bytes) => Ok(bytes.as_ref()),
115        }
116    }
117
118    #[cfg(feature = "multipart")]
119    pub(crate) fn len(&self) -> Option<u64> {
120        match self.kind {
121            Kind::Reader(_, len) => len,
122            Kind::Bytes(ref bytes) => Some(bytes.len() as u64),
123        }
124    }
125
126    #[cfg(feature = "multipart")]
127    pub(crate) fn into_reader(self) -> Reader {
128        match self.kind {
129            Kind::Reader(r, _) => Reader::Reader(r),
130            Kind::Bytes(b) => Reader::Bytes(Cursor::new(b)),
131        }
132    }
133
134    pub(crate) fn into_async(self) -> (Option<Sender>, async_impl::Body, Option<u64>) {
135        match self.kind {
136            Kind::Reader(read, len) => {
137                let (tx, rx) = mpsc::channel(0);
138                let tx = Sender {
139                    body: (read, len),
140                    tx,
141                };
142                (Some(tx), async_impl::Body::stream(rx), len)
143            }
144            Kind::Bytes(chunk) => {
145                let len = chunk.len() as u64;
146                (None, async_impl::Body::reusable(chunk), Some(len))
147            }
148        }
149    }
150
151    pub(crate) fn try_clone(&self) -> Option<Body> {
152        self.kind.try_clone().map(|kind| Body { kind })
153    }
154}
155
156enum Kind {
157    Reader(Box<dyn Read + Send>, Option<u64>),
158    Bytes(Bytes),
159}
160
161impl Kind {
162    fn try_clone(&self) -> Option<Kind> {
163        match self {
164            Kind::Reader(..) => None,
165            Kind::Bytes(v) => Some(Kind::Bytes(v.clone())),
166        }
167    }
168}
169
170impl From<Vec<u8>> for Body {
171    #[inline]
172    fn from(v: Vec<u8>) -> Body {
173        Body {
174            kind: Kind::Bytes(v.into()),
175        }
176    }
177}
178
179impl From<String> for Body {
180    #[inline]
181    fn from(s: String) -> Body {
182        s.into_bytes().into()
183    }
184}
185
186impl From<&'static [u8]> for Body {
187    #[inline]
188    fn from(s: &'static [u8]) -> Body {
189        Body {
190            kind: Kind::Bytes(Bytes::from_static(s)),
191        }
192    }
193}
194
195impl From<&'static str> for Body {
196    #[inline]
197    fn from(s: &'static str) -> Body {
198        s.as_bytes().into()
199    }
200}
201
202impl From<File> for Body {
203    #[inline]
204    fn from(f: File) -> Body {
205        let len = f.metadata().map(|m| m.len()).ok();
206        Body {
207            kind: Kind::Reader(Box::new(f), len),
208        }
209    }
210}
211impl From<Bytes> for Body {
212    #[inline]
213    fn from(b: Bytes) -> Body {
214        Body {
215            kind: Kind::Bytes(b),
216        }
217    }
218}
219
220impl fmt::Debug for Kind {
221    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
222        match *self {
223            Kind::Reader(_, ref v) => f
224                .debug_struct("Reader")
225                .field("length", &DebugLength(v))
226                .finish(),
227            Kind::Bytes(ref v) => fmt::Debug::fmt(v, f),
228        }
229    }
230}
231
232struct DebugLength<'a>(&'a Option<u64>);
233
234impl<'a> fmt::Debug for DebugLength<'a> {
235    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
236        match *self.0 {
237            Some(ref len) => fmt::Debug::fmt(len, f),
238            None => f.write_str("Unknown"),
239        }
240    }
241}
242
243#[cfg(feature = "multipart")]
244pub(crate) enum Reader {
245    Reader(Box<dyn Read + Send>),
246    Bytes(Cursor<Bytes>),
247}
248
249#[cfg(feature = "multipart")]
250impl Read for Reader {
251    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
252        match *self {
253            Reader::Reader(ref mut rdr) => rdr.read(buf),
254            Reader::Bytes(ref mut rdr) => rdr.read(buf),
255        }
256    }
257}
258
259pub(crate) struct Sender {
260    body: (Box<dyn Read + Send>, Option<u64>),
261    tx: mpsc::Sender<Result<Bytes, Abort>>,
262}
263
264#[derive(Debug)]
265struct Abort;
266
267impl fmt::Display for Abort {
268    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
269        f.write_str("abort request body")
270    }
271}
272
273impl std::error::Error for Abort {}
274
275async fn send_future(sender: Sender) -> Result<(), crate::Error> {
276    use bytes::{BufMut, BytesMut};
277    use futures_util::SinkExt;
278    use std::cmp;
279
280    let con_len = sender.body.1;
281    let cap = cmp::min(sender.body.1.unwrap_or(8192), 8192);
282    let mut written = 0;
283    let mut buf = BytesMut::with_capacity(cap as usize);
284    let mut body = sender.body.0;
285    // Put in an option so that it can be consumed on error to call abort()
286    let mut tx = Some(sender.tx);
287
288    loop {
289        if Some(written) == con_len {
290            // Written up to content-length, so stop.
291            return Ok(());
292        }
293
294        // The input stream is read only if the buffer is empty so
295        // that there is only one read in the buffer at any time.
296        //
297        // We need to know whether there is any data to send before
298        // we check the transmission channel (with poll_ready below)
299        // because sometimes the receiver disappears as soon as it
300        // considers the data is completely transmitted, which may
301        // be true.
302        //
303        // The use case is a web server that closes its
304        // input stream as soon as the data received is valid JSON.
305        // This behaviour is questionable, but it exists and the
306        // fact is that there is actually no remaining data to read.
307        if buf.is_empty() {
308            if buf.remaining_mut() == 0 {
309                buf.reserve(8192);
310                // zero out the reserved memory
311                let uninit = buf.chunk_mut();
312                unsafe {
313                    ptr::write_bytes(uninit.as_mut_ptr(), 0, uninit.len());
314                }
315            }
316
317            let bytes = unsafe { mem::transmute::<&mut UninitSlice, &mut [u8]>(buf.chunk_mut()) };
318            match body.read(bytes) {
319                Ok(0) => {
320                    // The buffer was empty and nothing's left to
321                    // read. Return.
322                    return Ok(());
323                }
324                Ok(n) => unsafe {
325                    buf.advance_mut(n);
326                },
327                Err(e) => {
328                    let _ = tx
329                        .take()
330                        .expect("tx only taken on error")
331                        .clone()
332                        .try_send(Err(Abort));
333                    return Err(crate::error::body(e));
334                }
335            }
336        }
337
338        // The only way to get here is when the buffer is not empty.
339        // We can check the transmission channel
340
341        let buf_len = buf.len() as u64;
342        tx.as_mut()
343            .expect("tx only taken on error")
344            .send(Ok(buf.split().freeze()))
345            .await
346            .map_err(crate::error::body)?;
347
348        written += buf_len;
349    }
350}
351
352impl Sender {
353    // A `Future` that may do blocking read calls.
354    // As a `Future`, this integrates easily with `wait::timeout`.
355    pub(crate) fn send(self) -> impl Future<Output = Result<(), crate::Error>> {
356        send_future(self)
357    }
358}
359
360// useful for tests, but not publicly exposed
361#[cfg(test)]
362pub(crate) fn read_to_string(mut body: Body) -> io::Result<String> {
363    let mut s = String::new();
364    match body.kind {
365        Kind::Reader(ref mut reader, _) => reader.read_to_string(&mut s),
366        Kind::Bytes(ref mut bytes) => (&**bytes).read_to_string(&mut s),
367    }
368    .map(|_| s)
369}