1use crate::path::Path;
21use crate::{
22 Attributes, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayloadMut, TagSet,
23 WriteMultipart,
24};
25use bytes::Bytes;
26use futures::future::{BoxFuture, FutureExt};
27use futures::ready;
28use std::cmp::Ordering;
29use std::io::{Error, ErrorKind, SeekFrom};
30use std::pin::Pin;
31use std::sync::Arc;
32use std::task::{Context, Poll};
33use tokio::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
34
35pub const DEFAULT_BUFFER_SIZE: usize = 1024 * 1024;
37
38pub struct BufReader {
56 store: Arc<dyn ObjectStore>,
58 size: u64,
60 path: Path,
62 cursor: u64,
64 capacity: usize,
66 buffer: Buffer,
68}
69
70impl std::fmt::Debug for BufReader {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 f.debug_struct("BufReader")
73 .field("path", &self.path)
74 .field("size", &self.size)
75 .field("capacity", &self.capacity)
76 .finish()
77 }
78}
79
80enum Buffer {
81 Empty,
82 Pending(BoxFuture<'static, std::io::Result<Bytes>>),
83 Ready(Bytes),
84}
85
86impl BufReader {
87 pub fn new(store: Arc<dyn ObjectStore>, meta: &ObjectMeta) -> Self {
89 Self::with_capacity(store, meta, DEFAULT_BUFFER_SIZE)
90 }
91
92 pub fn with_capacity(store: Arc<dyn ObjectStore>, meta: &ObjectMeta, capacity: usize) -> Self {
94 Self {
95 path: meta.location.clone(),
96 size: meta.size as _,
97 store,
98 capacity,
99 cursor: 0,
100 buffer: Buffer::Empty,
101 }
102 }
103
104 fn poll_fill_buf_impl(
105 &mut self,
106 cx: &mut Context<'_>,
107 amnt: usize,
108 ) -> Poll<std::io::Result<&[u8]>> {
109 let buf = &mut self.buffer;
110 loop {
111 match buf {
112 Buffer::Empty => {
113 let store = Arc::clone(&self.store);
114 let path = self.path.clone();
115 let start = self.cursor.min(self.size) as _;
116 let end = self.cursor.saturating_add(amnt as u64).min(self.size) as _;
117
118 if start == end {
119 return Poll::Ready(Ok(&[]));
120 }
121
122 *buf = Buffer::Pending(Box::pin(async move {
123 Ok(store.get_range(&path, start..end).await?)
124 }))
125 }
126 Buffer::Pending(fut) => match ready!(fut.poll_unpin(cx)) {
127 Ok(b) => *buf = Buffer::Ready(b),
128 Err(e) => return Poll::Ready(Err(e)),
129 },
130 Buffer::Ready(r) => return Poll::Ready(Ok(r)),
131 }
132 }
133 }
134}
135
136impl AsyncSeek for BufReader {
137 fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> std::io::Result<()> {
138 self.cursor = match position {
139 SeekFrom::Start(offset) => offset,
140 SeekFrom::End(offset) => checked_add_signed(self.size, offset).ok_or_else(|| {
141 Error::new(
142 ErrorKind::InvalidInput,
143 format!(
144 "Seeking {offset} from end of {} byte file would result in overflow",
145 self.size
146 ),
147 )
148 })?,
149 SeekFrom::Current(offset) => {
150 checked_add_signed(self.cursor, offset).ok_or_else(|| {
151 Error::new(
152 ErrorKind::InvalidInput,
153 format!(
154 "Seeking {offset} from current offset of {} would result in overflow",
155 self.cursor
156 ),
157 )
158 })?
159 }
160 };
161 self.buffer = Buffer::Empty;
162 Ok(())
163 }
164
165 fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<u64>> {
166 Poll::Ready(Ok(self.cursor))
167 }
168}
169
170impl AsyncRead for BufReader {
171 fn poll_read(
172 mut self: Pin<&mut Self>,
173 cx: &mut Context<'_>,
174 out: &mut ReadBuf<'_>,
175 ) -> Poll<std::io::Result<()>> {
176 let to_read = out.remaining().max(self.capacity);
178 let r = match ready!(self.poll_fill_buf_impl(cx, to_read)) {
179 Ok(buf) => {
180 let to_consume = out.remaining().min(buf.len());
181 out.put_slice(&buf[..to_consume]);
182 self.consume(to_consume);
183 Ok(())
184 }
185 Err(e) => Err(e),
186 };
187 Poll::Ready(r)
188 }
189}
190
191impl AsyncBufRead for BufReader {
192 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<&[u8]>> {
193 let capacity = self.capacity;
194 self.get_mut().poll_fill_buf_impl(cx, capacity)
195 }
196
197 fn consume(mut self: Pin<&mut Self>, amt: usize) {
198 match &mut self.buffer {
199 Buffer::Empty => assert_eq!(amt, 0, "cannot consume from empty buffer"),
200 Buffer::Ready(b) => match b.len().cmp(&amt) {
201 Ordering::Less => panic!("{amt} exceeds buffer sized of {}", b.len()),
202 Ordering::Greater => *b = b.slice(amt..),
203 Ordering::Equal => self.buffer = Buffer::Empty,
204 },
205 Buffer::Pending(_) => panic!("cannot consume from pending buffer"),
206 }
207 self.cursor += amt as u64;
208 }
209}
210
211pub struct BufWriter {
221 capacity: usize,
222 max_concurrency: usize,
223 attributes: Option<Attributes>,
224 tags: Option<TagSet>,
225 state: BufWriterState,
226 store: Arc<dyn ObjectStore>,
227}
228
229impl std::fmt::Debug for BufWriter {
230 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
231 f.debug_struct("BufWriter")
232 .field("capacity", &self.capacity)
233 .finish()
234 }
235}
236
237enum BufWriterState {
238 Buffer(Path, PutPayloadMut),
240 Prepare(BoxFuture<'static, crate::Result<WriteMultipart>>),
242 Write(Option<WriteMultipart>),
244 Flush(BoxFuture<'static, crate::Result<()>>),
246}
247
248impl BufWriter {
249 pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
251 Self::with_capacity(store, path, 10 * 1024 * 1024)
252 }
253
254 pub fn with_capacity(store: Arc<dyn ObjectStore>, path: Path, capacity: usize) -> Self {
256 Self {
257 capacity,
258 store,
259 max_concurrency: 8,
260 attributes: None,
261 tags: None,
262 state: BufWriterState::Buffer(path, PutPayloadMut::new()),
263 }
264 }
265
266 pub fn with_max_concurrency(self, max_concurrency: usize) -> Self {
270 Self {
271 max_concurrency,
272 ..self
273 }
274 }
275
276 pub fn with_attributes(self, attributes: Attributes) -> Self {
278 Self {
279 attributes: Some(attributes),
280 ..self
281 }
282 }
283
284 pub fn with_tags(self, tags: TagSet) -> Self {
286 Self {
287 tags: Some(tags),
288 ..self
289 }
290 }
291
292 pub async fn put(&mut self, bytes: Bytes) -> crate::Result<()> {
298 loop {
299 return match &mut self.state {
300 BufWriterState::Write(Some(write)) => {
301 write.wait_for_capacity(self.max_concurrency).await?;
302 write.put(bytes);
303 Ok(())
304 }
305 BufWriterState::Write(None) | BufWriterState::Flush(_) => {
306 panic!("Already shut down")
307 }
308 BufWriterState::Prepare(f) => {
315 self.state = BufWriterState::Write(f.await?.into());
316 continue;
317 }
318 BufWriterState::Buffer(path, b) => {
319 if b.content_length().saturating_add(bytes.len()) < self.capacity {
320 b.push(bytes);
321 Ok(())
322 } else {
323 let buffer = std::mem::take(b);
324 let path = std::mem::take(path);
325 let opts = PutMultipartOpts {
326 attributes: self.attributes.take().unwrap_or_default(),
327 tags: self.tags.take().unwrap_or_default(),
328 };
329 let upload = self.store.put_multipart_opts(&path, opts).await?;
330 let mut chunked =
331 WriteMultipart::new_with_chunk_size(upload, self.capacity);
332 for chunk in buffer.freeze() {
333 chunked.put(chunk);
334 }
335 chunked.put(bytes);
336 self.state = BufWriterState::Write(Some(chunked));
337 Ok(())
338 }
339 }
340 };
341 }
342 }
343
344 pub async fn abort(&mut self) -> crate::Result<()> {
350 match &mut self.state {
351 BufWriterState::Buffer(_, _) | BufWriterState::Prepare(_) => Ok(()),
352 BufWriterState::Flush(_) => panic!("Already shut down"),
353 BufWriterState::Write(x) => x.take().unwrap().abort().await,
354 }
355 }
356}
357
358impl AsyncWrite for BufWriter {
359 fn poll_write(
360 mut self: Pin<&mut Self>,
361 cx: &mut Context<'_>,
362 buf: &[u8],
363 ) -> Poll<Result<usize, Error>> {
364 let cap = self.capacity;
365 let max_concurrency = self.max_concurrency;
366 loop {
367 return match &mut self.state {
368 BufWriterState::Write(Some(write)) => {
369 ready!(write.poll_for_capacity(cx, max_concurrency))?;
370 write.write(buf);
371 Poll::Ready(Ok(buf.len()))
372 }
373 BufWriterState::Write(None) | BufWriterState::Flush(_) => {
374 panic!("Already shut down")
375 }
376 BufWriterState::Prepare(f) => {
377 self.state = BufWriterState::Write(ready!(f.poll_unpin(cx)?).into());
378 continue;
379 }
380 BufWriterState::Buffer(path, b) => {
381 if b.content_length().saturating_add(buf.len()) >= cap {
382 let buffer = std::mem::take(b);
383 let path = std::mem::take(path);
384 let opts = PutMultipartOpts {
385 attributes: self.attributes.take().unwrap_or_default(),
386 tags: self.tags.take().unwrap_or_default(),
387 };
388 let store = Arc::clone(&self.store);
389 self.state = BufWriterState::Prepare(Box::pin(async move {
390 let upload = store.put_multipart_opts(&path, opts).await?;
391 let mut chunked = WriteMultipart::new_with_chunk_size(upload, cap);
392 for chunk in buffer.freeze() {
393 chunked.put(chunk);
394 }
395 Ok(chunked)
396 }));
397 continue;
398 }
399 b.extend_from_slice(buf);
400 Poll::Ready(Ok(buf.len()))
401 }
402 };
403 }
404 }
405
406 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
407 loop {
408 return match &mut self.state {
409 BufWriterState::Write(_) | BufWriterState::Buffer(_, _) => Poll::Ready(Ok(())),
410 BufWriterState::Flush(_) => panic!("Already shut down"),
411 BufWriterState::Prepare(f) => {
412 self.state = BufWriterState::Write(ready!(f.poll_unpin(cx)?).into());
413 continue;
414 }
415 };
416 }
417 }
418
419 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
420 loop {
421 match &mut self.state {
422 BufWriterState::Prepare(f) => {
423 self.state = BufWriterState::Write(ready!(f.poll_unpin(cx)?).into());
424 }
425 BufWriterState::Buffer(p, b) => {
426 let buf = std::mem::take(b);
427 let path = std::mem::take(p);
428 let opts = PutOptions {
429 attributes: self.attributes.take().unwrap_or_default(),
430 tags: self.tags.take().unwrap_or_default(),
431 ..Default::default()
432 };
433 let store = Arc::clone(&self.store);
434 self.state = BufWriterState::Flush(Box::pin(async move {
435 store.put_opts(&path, buf.into(), opts).await?;
436 Ok(())
437 }));
438 }
439 BufWriterState::Flush(f) => return f.poll_unpin(cx).map_err(std::io::Error::from),
440 BufWriterState::Write(x) => {
441 let upload = x.take().ok_or_else(|| {
442 std::io::Error::new(
443 ErrorKind::InvalidInput,
444 "Cannot shutdown a writer that has already been shut down",
445 )
446 })?;
447 self.state = BufWriterState::Flush(
448 async move {
449 upload.finish().await?;
450 Ok(())
451 }
452 .boxed(),
453 )
454 }
455 }
456 }
457 }
458}
459
460#[inline]
464fn checked_add_signed(a: u64, rhs: i64) -> Option<u64> {
465 let (res, overflowed) = a.overflowing_add(rhs as _);
466 let overflow = overflowed ^ (rhs < 0);
467 (!overflow).then_some(res)
468}
469
470#[cfg(test)]
471mod tests {
472 use super::*;
473 use crate::memory::InMemory;
474 use crate::path::Path;
475 use crate::{Attribute, GetOptions};
476 use itertools::Itertools;
477 use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
478
479 #[tokio::test]
480 async fn test_buf_reader() {
481 let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
482
483 let existent = Path::from("exists.txt");
484 const BYTES: usize = 4096;
485
486 let data: Bytes = b"12345678".iter().cycle().copied().take(BYTES).collect();
487 store.put(&existent, data.clone().into()).await.unwrap();
488
489 let meta = store.head(&existent).await.unwrap();
490
491 let mut reader = BufReader::new(Arc::clone(&store), &meta);
492 let mut out = Vec::with_capacity(BYTES);
493 let read = reader.read_to_end(&mut out).await.unwrap();
494
495 assert_eq!(read, BYTES);
496 assert_eq!(&out, &data);
497
498 let err = reader.seek(SeekFrom::Current(i64::MIN)).await.unwrap_err();
499 assert_eq!(
500 err.to_string(),
501 "Seeking -9223372036854775808 from current offset of 4096 would result in overflow"
502 );
503
504 reader.rewind().await.unwrap();
505
506 let err = reader.seek(SeekFrom::Current(-1)).await.unwrap_err();
507 assert_eq!(
508 err.to_string(),
509 "Seeking -1 from current offset of 0 would result in overflow"
510 );
511
512 reader.seek(SeekFrom::Start(u64::MAX)).await.unwrap();
514 let buf = reader.fill_buf().await.unwrap();
515 assert!(buf.is_empty());
516
517 let err = reader.seek(SeekFrom::Current(1)).await.unwrap_err();
518 assert_eq!(
519 err.to_string(),
520 "Seeking 1 from current offset of 18446744073709551615 would result in overflow"
521 );
522
523 for capacity in [200, 1024, 4096, DEFAULT_BUFFER_SIZE] {
524 let store = Arc::clone(&store);
525 let mut reader = BufReader::with_capacity(store, &meta, capacity);
526
527 let mut bytes_read = 0;
528 loop {
529 let buf = reader.fill_buf().await.unwrap();
530 if buf.is_empty() {
531 assert_eq!(bytes_read, BYTES);
532 break;
533 }
534 assert!(buf.starts_with(b"12345678"));
535 bytes_read += 8;
536 reader.consume(8);
537 }
538
539 let mut buf = Vec::with_capacity(76);
540 reader.seek(SeekFrom::Current(-76)).await.unwrap();
541 reader.read_to_end(&mut buf).await.unwrap();
542 assert_eq!(&buf, &data[BYTES - 76..]);
543
544 reader.rewind().await.unwrap();
545 let buffer = reader.fill_buf().await.unwrap();
546 assert_eq!(buffer, &data[..capacity.min(BYTES)]);
547
548 reader.seek(SeekFrom::Start(325)).await.unwrap();
549 let buffer = reader.fill_buf().await.unwrap();
550 assert_eq!(buffer, &data[325..(325 + capacity).min(BYTES)]);
551
552 reader.seek(SeekFrom::End(0)).await.unwrap();
553 let buffer = reader.fill_buf().await.unwrap();
554 assert!(buffer.is_empty());
555 }
556 }
557
558 #[tokio::test]
560 async fn test_buf_writer() {
561 let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
562 let path = Path::from("file.txt");
563 let attributes = Attributes::from_iter([
564 (Attribute::ContentType, "text/html"),
565 (Attribute::CacheControl, "max-age=604800"),
566 ]);
567
568 let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30)
570 .with_attributes(attributes.clone());
571 writer.write_all(&[0; 20]).await.unwrap();
572 writer.flush().await.unwrap();
573 writer.write_all(&[0; 5]).await.unwrap();
574 writer.shutdown().await.unwrap();
575 let response = store
576 .get_opts(
577 &path,
578 GetOptions {
579 head: true,
580 ..Default::default()
581 },
582 )
583 .await
584 .unwrap();
585 assert_eq!(response.meta.size, 25);
586 assert_eq!(response.attributes, attributes);
587
588 let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30)
590 .with_attributes(attributes.clone());
591 writer.write_all(&[0; 20]).await.unwrap();
592 writer.flush().await.unwrap();
593 writer.write_all(&[0; 20]).await.unwrap();
594 writer.shutdown().await.unwrap();
595 let response = store
596 .get_opts(
597 &path,
598 GetOptions {
599 head: true,
600 ..Default::default()
601 },
602 )
603 .await
604 .unwrap();
605 assert_eq!(response.meta.size, 40);
606 assert_eq!(response.attributes, attributes);
607 }
608
609 #[tokio::test]
610 async fn test_buf_writer_with_put() {
611 let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
612 let path = Path::from("file.txt");
613
614 let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30);
616 writer
617 .put(Bytes::from((0..20).collect_vec()))
618 .await
619 .unwrap();
620 writer
621 .put(Bytes::from((20..25).collect_vec()))
622 .await
623 .unwrap();
624 writer.shutdown().await.unwrap();
625 let response = store
626 .get_opts(
627 &path,
628 GetOptions {
629 head: true,
630 ..Default::default()
631 },
632 )
633 .await
634 .unwrap();
635 assert_eq!(response.meta.size, 25);
636 assert_eq!(response.bytes().await.unwrap(), (0..25).collect_vec());
637
638 let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30);
640 writer
641 .put(Bytes::from((0..20).collect_vec()))
642 .await
643 .unwrap();
644 writer
645 .put(Bytes::from((20..40).collect_vec()))
646 .await
647 .unwrap();
648 writer.shutdown().await.unwrap();
649 let response = store
650 .get_opts(
651 &path,
652 GetOptions {
653 head: true,
654 ..Default::default()
655 },
656 )
657 .await
658 .unwrap();
659 assert_eq!(response.meta.size, 40);
660 assert_eq!(response.bytes().await.unwrap(), (0..40).collect_vec());
661 }
662}