zstd/stream/write/
mod.rs

1//! Implement push-based [`Write`] trait for both compressing and decompressing.
2use std::io::{self, Write};
3
4use zstd_safe;
5
6use crate::dict::{DecoderDictionary, EncoderDictionary};
7use crate::stream::{raw, zio};
8
9#[cfg(test)]
10mod tests;
11
12/// An encoder that compress and forward data to another writer.
13///
14/// This allows to compress a stream of data
15/// (good for files or heavy network stream).
16///
17/// Don't forget to call [`finish()`] before dropping it!
18///
19/// Alternatively, you can call [`auto_finish()`] to use an
20/// [`AutoFinishEncoder`] that will finish on drop.
21///
22/// Note: The zstd library has its own internal input buffer (~128kb).
23///
24/// [`finish()`]: #method.finish
25/// [`auto_finish()`]: #method.auto_finish
26/// [`AutoFinishEncoder`]: AutoFinishEncoder
27pub struct Encoder<'a, W: Write> {
28    // output writer (compressed data)
29    writer: zio::Writer<W, raw::Encoder<'a>>,
30}
31
32/// A decoder that decompress and forward data to another writer.
33///
34/// Note that you probably want to `flush()` after writing your stream content.
35/// You can use [`auto_flush()`] to automatically flush the writer on drop.
36///
37/// [`auto_flush()`]: Decoder::auto_flush
38pub struct Decoder<'a, W: Write> {
39    // output writer (decompressed data)
40    writer: zio::Writer<W, raw::Decoder<'a>>,
41}
42
43/// A wrapper around an `Encoder<W>` that finishes the stream on drop.
44///
45/// This can be created by the [`auto_finish()`] method on the [`Encoder`].
46///
47/// [`auto_finish()`]: Encoder::auto_finish
48/// [`Encoder`]: Encoder
49pub struct AutoFinishEncoder<
50    'a,
51    W: Write,
52    F: FnMut(io::Result<W>) = Box<dyn Send + FnMut(io::Result<W>)>,
53> {
54    // We wrap this in an option to take it during drop.
55    encoder: Option<Encoder<'a, W>>,
56
57    on_finish: Option<F>,
58}
59
60/// A wrapper around a `Decoder<W>` that flushes the stream on drop.
61///
62/// This can be created by the [`auto_flush()`] method on the [`Decoder`].
63///
64/// [`auto_flush()`]: Decoder::auto_flush
65/// [`Decoder`]: Decoder
66pub struct AutoFlushDecoder<
67    'a,
68    W: Write,
69    F: FnMut(io::Result<()>) = Box<dyn Send + FnMut(io::Result<()>)>,
70> {
71    // We wrap this in an option to take it during drop.
72    decoder: Option<Decoder<'a, W>>,
73
74    on_flush: Option<F>,
75}
76
77impl<'a, W: Write, F: FnMut(io::Result<()>)> AutoFlushDecoder<'a, W, F> {
78    fn new(decoder: Decoder<'a, W>, on_flush: F) -> Self {
79        AutoFlushDecoder {
80            decoder: Some(decoder),
81            on_flush: Some(on_flush),
82        }
83    }
84
85    /// Acquires a reference to the underlying writer.
86    pub fn get_ref(&self) -> &W {
87        self.decoder.as_ref().unwrap().get_ref()
88    }
89
90    /// Acquires a mutable reference to the underlying writer.
91    ///
92    /// Note that mutation of the writer may result in surprising results if
93    /// this decoder is continued to be used.
94    ///
95    /// Mostly used for testing purposes.
96    pub fn get_mut(&mut self) -> &mut W {
97        self.decoder.as_mut().unwrap().get_mut()
98    }
99}
100
101impl<W, F> Drop for AutoFlushDecoder<'_, W, F>
102where
103    W: Write,
104    F: FnMut(io::Result<()>),
105{
106    fn drop(&mut self) {
107        let mut decoder = self.decoder.take().unwrap();
108        let result = decoder.flush();
109        if let Some(mut on_finish) = self.on_flush.take() {
110            on_finish(result);
111        }
112    }
113}
114
115impl<W: Write, F: FnMut(io::Result<()>)> Write for AutoFlushDecoder<'_, W, F> {
116    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
117        self.decoder.as_mut().unwrap().write(buf)
118    }
119
120    fn flush(&mut self) -> io::Result<()> {
121        self.decoder.as_mut().unwrap().flush()
122    }
123}
124
125impl<'a, W: Write, F: FnMut(io::Result<W>)> AutoFinishEncoder<'a, W, F> {
126    fn new(encoder: Encoder<'a, W>, on_finish: F) -> Self {
127        AutoFinishEncoder {
128            encoder: Some(encoder),
129            on_finish: Some(on_finish),
130        }
131    }
132
133    /// Acquires a reference to the underlying writer.
134    pub fn get_ref(&self) -> &W {
135        self.encoder.as_ref().unwrap().get_ref()
136    }
137
138    /// Acquires a mutable reference to the underlying writer.
139    ///
140    /// Note that mutation of the writer may result in surprising results if
141    /// this encoder is continued to be used.
142    ///
143    /// Mostly used for testing purposes.
144    pub fn get_mut(&mut self) -> &mut W {
145        self.encoder.as_mut().unwrap().get_mut()
146    }
147}
148
149impl<W: Write, F: FnMut(io::Result<W>)> Drop for AutoFinishEncoder<'_, W, F> {
150    fn drop(&mut self) {
151        let result = self.encoder.take().unwrap().finish();
152        if let Some(mut on_finish) = self.on_finish.take() {
153            on_finish(result);
154        }
155    }
156}
157
158impl<W: Write, F: FnMut(io::Result<W>)> Write for AutoFinishEncoder<'_, W, F> {
159    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
160        self.encoder.as_mut().unwrap().write(buf)
161    }
162
163    fn flush(&mut self) -> io::Result<()> {
164        self.encoder.as_mut().unwrap().flush()
165    }
166}
167
168impl<W: Write> Encoder<'static, W> {
169    /// Creates a new encoder.
170    ///
171    /// `level`: compression level (1-22).
172    ///
173    /// A level of `0` uses zstd's default (currently `3`).
174    pub fn new(writer: W, level: i32) -> io::Result<Self> {
175        Self::with_dictionary(writer, level, &[])
176    }
177
178    /// Creates a new encoder, using an existing dictionary.
179    ///
180    /// (Provides better compression ratio for small files,
181    /// but requires the dictionary to be present during decompression.)
182    ///
183    /// A level of `0` uses zstd's default (currently `3`).
184    pub fn with_dictionary(
185        writer: W,
186        level: i32,
187        dictionary: &[u8],
188    ) -> io::Result<Self> {
189        let encoder = raw::Encoder::with_dictionary(level, dictionary)?;
190        let writer = zio::Writer::new(writer, encoder);
191        Ok(Encoder { writer })
192    }
193}
194
195impl<'a, W: Write> Encoder<'a, W> {
196    /// Creates an encoder that uses the provided context to compress a stream.
197    pub fn with_context(
198        writer: W,
199        context: &'a mut zstd_safe::CCtx<'static>,
200    ) -> Self {
201        Self {
202            writer: zio::Writer::new(
203                writer,
204                raw::Encoder::with_context(context),
205            ),
206        }
207    }
208
209    /// Creates a new encoder, using an existing prepared `EncoderDictionary`.
210    ///
211    /// (Provides better compression ratio for small files,
212    /// but requires the dictionary to be present during decompression.)
213    pub fn with_prepared_dictionary<'b>(
214        writer: W,
215        dictionary: &EncoderDictionary<'b>,
216    ) -> io::Result<Self>
217    where
218        'b: 'a,
219    {
220        let encoder = raw::Encoder::with_prepared_dictionary(dictionary)?;
221        let writer = zio::Writer::new(writer, encoder);
222        Ok(Encoder { writer })
223    }
224
225    /// Creates a new encoder, using a ref prefix
226    pub fn with_ref_prefix<'b>(
227        writer: W,
228        level: i32,
229        ref_prefix: &'b [u8],
230    ) -> io::Result<Self>
231    where
232        'b: 'a,
233    {
234        let encoder = raw::Encoder::with_ref_prefix(level, ref_prefix)?;
235        let writer = zio::Writer::new(writer, encoder);
236        Ok(Encoder { writer })
237    }
238
239    /// Returns a wrapper around `self` that will finish the stream on drop.
240    pub fn auto_finish(self) -> AutoFinishEncoder<'a, W> {
241        AutoFinishEncoder {
242            encoder: Some(self),
243            on_finish: None,
244        }
245    }
246
247    /// Returns an encoder that will finish the stream on drop.
248    ///
249    /// Calls the given callback with the result from `finish()`. This runs during drop so it's
250    /// important that the provided callback doesn't panic.
251    pub fn on_finish<F: FnMut(io::Result<W>)>(
252        self,
253        f: F,
254    ) -> AutoFinishEncoder<'a, W, F> {
255        AutoFinishEncoder::new(self, f)
256    }
257
258    /// Acquires a reference to the underlying writer.
259    pub fn get_ref(&self) -> &W {
260        self.writer.writer()
261    }
262
263    /// Acquires a mutable reference to the underlying writer.
264    ///
265    /// Note that mutation of the writer may result in surprising results if
266    /// this encoder is continued to be used.
267    pub fn get_mut(&mut self) -> &mut W {
268        self.writer.writer_mut()
269    }
270
271    /// **Required**: Finishes the stream.
272    ///
273    /// You *need* to finish the stream when you're done writing, either with
274    /// this method or with [`try_finish(self)`](#method.try_finish).
275    ///
276    /// This returns the inner writer in case you need it.
277    ///
278    /// To get back `self` in case an error happened, use `try_finish`.
279    ///
280    /// **Note**: If you don't want (or can't) call `finish()` manually after
281    ///           writing your data, consider using `auto_finish()` to get an
282    ///           `AutoFinishEncoder`.
283    pub fn finish(self) -> io::Result<W> {
284        self.try_finish().map_err(|(_, err)| err)
285    }
286
287    /// **Required**: Attempts to finish the stream.
288    ///
289    /// You *need* to finish the stream when you're done writing, either with
290    /// this method or with [`finish(self)`](#method.finish).
291    ///
292    /// This returns the inner writer if the finish was successful, or the
293    /// object plus an error if it wasn't.
294    ///
295    /// `write` on this object will panic after `try_finish` has been called,
296    /// even if it fails.
297    pub fn try_finish(mut self) -> Result<W, (Self, io::Error)> {
298        match self.writer.finish() {
299            // Return the writer, because why not
300            Ok(()) => Ok(self.writer.into_inner().0),
301            Err(e) => Err((self, e)),
302        }
303    }
304
305    /// Attempts to finish the stream.
306    ///
307    /// You *need* to finish the stream when you're done writing, either with
308    /// this method or with [`finish(self)`](#method.finish).
309    pub fn do_finish(&mut self) -> io::Result<()> {
310        self.writer.finish()
311    }
312
313    /// Return a recommendation for the size of data to write at once.
314    pub fn recommended_input_size() -> usize {
315        zstd_safe::CCtx::in_size()
316    }
317
318    crate::encoder_common!(writer);
319}
320
321impl<'a, W: Write> Write for Encoder<'a, W> {
322    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
323        self.writer.write(buf)
324    }
325
326    fn flush(&mut self) -> io::Result<()> {
327        self.writer.flush()
328    }
329}
330
331impl<W: Write> Decoder<'static, W> {
332    /// Creates a new decoder.
333    pub fn new(writer: W) -> io::Result<Self> {
334        Self::with_dictionary(writer, &[])
335    }
336
337    /// Creates a new decoder, using an existing dictionary.
338    ///
339    /// (Provides better compression ratio for small files,
340    /// but requires the dictionary to be present during decompression.)
341    pub fn with_dictionary(writer: W, dictionary: &[u8]) -> io::Result<Self> {
342        let decoder = raw::Decoder::with_dictionary(dictionary)?;
343        let writer = zio::Writer::new(writer, decoder);
344        Ok(Decoder { writer })
345    }
346}
347
348impl<'a, W: Write> Decoder<'a, W> {
349    /// Creates a new decoder, using an existing prepared `DecoderDictionary`.
350    ///
351    /// (Provides better compression ratio for small files,
352    /// but requires the dictionary to be present during decompression.)
353    pub fn with_prepared_dictionary<'b>(
354        writer: W,
355        dictionary: &DecoderDictionary<'b>,
356    ) -> io::Result<Self>
357    where
358        'b: 'a,
359    {
360        let decoder = raw::Decoder::with_prepared_dictionary(dictionary)?;
361        let writer = zio::Writer::new(writer, decoder);
362        Ok(Decoder { writer })
363    }
364
365    /// Acquires a reference to the underlying writer.
366    pub fn get_ref(&self) -> &W {
367        self.writer.writer()
368    }
369
370    /// Acquires a mutable reference to the underlying writer.
371    ///
372    /// Note that mutation of the writer may result in surprising results if
373    /// this decoder is continued to be used.
374    pub fn get_mut(&mut self) -> &mut W {
375        self.writer.writer_mut()
376    }
377
378    /// Returns the inner `Write`.
379    pub fn into_inner(self) -> W {
380        self.writer.into_inner().0
381    }
382
383    /// Return a recommendation for the size of data to write at once.
384    pub fn recommended_input_size() -> usize {
385        zstd_safe::DCtx::in_size()
386    }
387
388    /// Returns a wrapper around `self` that will flush the stream on drop.
389    pub fn auto_flush(self) -> AutoFlushDecoder<'a, W> {
390        AutoFlushDecoder {
391            decoder: Some(self),
392            on_flush: None,
393        }
394    }
395
396    /// Returns a decoder that will flush the stream on drop.
397    ///
398    /// Calls the given callback with the result from `flush()`. This runs during drop so it's
399    /// important that the provided callback doesn't panic.
400    pub fn on_flush<F: FnMut(io::Result<()>)>(
401        self,
402        f: F,
403    ) -> AutoFlushDecoder<'a, W, F> {
404        AutoFlushDecoder::new(self, f)
405    }
406
407    crate::decoder_common!(writer);
408}
409
410impl<W: Write> Write for Decoder<'_, W> {
411    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
412        self.writer.write(buf)
413    }
414
415    fn flush(&mut self) -> io::Result<()> {
416        self.writer.flush()
417    }
418}
419
420fn _assert_traits() {
421    fn _assert_send<T: Send>(_: T) {}
422
423    _assert_send(Decoder::new(Vec::new()));
424    _assert_send(Encoder::new(Vec::new(), 1));
425    _assert_send(Decoder::new(Vec::new()).unwrap().auto_flush());
426    _assert_send(Encoder::new(Vec::new(), 1).unwrap().auto_finish());
427}