Line data Source code
1 : //
2 : // Copyright (c) 2019 Vinnie Falco (vinnie.falco@gmail.com)
3 : // Copyright (c) 2024 Christian Mazakas
4 : // Copyright (c) 2024 Mohammad Nejati
5 : //
6 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
7 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
8 : //
9 : // Official repository: https://github.com/cppalliance/http_proto
10 : //
11 :
12 : #include <boost/http_proto/detail/except.hpp>
13 : #include <boost/http_proto/detail/header.hpp>
14 : #include <boost/http_proto/message_base.hpp>
15 : #include <boost/http_proto/serializer.hpp>
16 :
17 : #include "src/detail/array_of_const_buffers.hpp"
18 : #include "src/detail/brotli_filter_base.hpp"
19 : #include "src/detail/buffer_utils.hpp"
20 : #include "src/detail/zlib_filter_base.hpp"
21 :
22 : #include <boost/buffers/circular_buffer.hpp>
23 : #include <boost/buffers/copy.hpp>
24 : #include <boost/core/bit.hpp>
25 : #include <boost/core/ignore_unused.hpp>
26 : #include <boost/capy/brotli/encode.hpp>
27 : #include <boost/capy/polystore.hpp>
28 : #include <boost/capy/zlib/compression_method.hpp>
29 : #include <boost/capy/zlib/compression_strategy.hpp>
30 : #include <boost/capy/zlib/deflate.hpp>
31 : #include <boost/capy/zlib/error.hpp>
32 : #include <boost/capy/zlib/flush.hpp>
33 :
34 : #include <stddef.h>
35 :
36 : namespace boost {
37 : namespace http_proto {
38 :
39 : namespace {
40 :
41 : const
42 : buffers::const_buffer
43 : crlf_and_final_chunk = {"\r\n0\r\n\r\n", 7};
44 :
45 : const
46 : buffers::const_buffer
47 : crlf = {"\r\n", 2};
48 :
49 : const
50 : buffers::const_buffer
51 : final_chunk = {"0\r\n\r\n", 5};
52 :
53 : constexpr
54 : std::uint8_t
55 76 : chunk_header_len(
56 : std::size_t max_chunk_size) noexcept
57 : {
58 : return
59 : static_cast<uint8_t>(
60 76 : (core::bit_width(max_chunk_size) + 3) / 4 +
61 76 : 2); // crlf
62 : };
63 :
64 : void
65 1146 : write_chunk_header(
66 : const buffers::mutable_buffer_pair& mbs,
67 : std::size_t size) noexcept
68 : {
69 : static constexpr char hexdig[] =
70 : "0123456789ABCDEF";
71 : char buf[18];
72 1146 : auto p = buf + 16;
73 1146 : auto const n = buffers::size(mbs);
74 5729 : for(std::size_t i = n - 2; i--;)
75 : {
76 4583 : *--p = hexdig[size & 0xf];
77 4583 : size >>= 4;
78 : }
79 1146 : buf[16] = '\r';
80 1146 : buf[17] = '\n';
81 1146 : auto copied = buffers::copy(
82 : mbs,
83 2292 : buffers::const_buffer(p, n));
84 : ignore_unused(copied);
85 1146 : BOOST_ASSERT(copied == n);
86 1146 : }
87 :
88 : //------------------------------------------------
89 :
90 : class zlib_filter
91 : : public detail::zlib_filter_base
92 : {
93 : capy::zlib::deflate_service& svc_;
94 :
95 : public:
96 52 : zlib_filter(
97 : const capy::polystore& ctx,
98 : http_proto::detail::workspace& ws,
99 : int comp_level,
100 : int window_bits,
101 : int mem_level)
102 52 : : zlib_filter_base(ws)
103 52 : , svc_(ctx.get<capy::zlib::deflate_service>())
104 : {
105 104 : system::error_code ec = static_cast<capy::zlib::error>(svc_.init2(
106 52 : strm_,
107 : comp_level,
108 : capy::zlib::deflated,
109 : window_bits,
110 : mem_level,
111 52 : capy::zlib::default_strategy));
112 52 : if(ec != capy::zlib::error::ok)
113 0 : detail::throw_system_error(ec);
114 52 : }
115 :
116 : private:
117 : virtual
118 : std::size_t
119 86 : min_out_buffer() const noexcept override
120 : {
121 : // Prevents deflate from producing
122 : // zero output due to small buffer
123 86 : return 8;
124 : }
125 :
126 : virtual
127 : results
128 4922 : do_process(
129 : buffers::mutable_buffer out,
130 : buffers::const_buffer in,
131 : bool more) noexcept override
132 : {
133 4922 : strm_.next_out = static_cast<unsigned char*>(out.data());
134 4922 : strm_.avail_out = saturate_cast(out.size());
135 4922 : strm_.next_in = static_cast<unsigned char*>(const_cast<void *>(in.data()));
136 4922 : strm_.avail_in = saturate_cast(in.size());
137 :
138 : auto rs = static_cast<capy::zlib::error>(
139 4922 : svc_.deflate(
140 4922 : strm_,
141 : more ? capy::zlib::no_flush : capy::zlib::finish));
142 :
143 4922 : results rv;
144 4922 : rv.out_bytes = saturate_cast(out.size()) - strm_.avail_out;
145 4922 : rv.in_bytes = saturate_cast(in.size()) - strm_.avail_in;
146 4922 : rv.finished = (rs == capy::zlib::error::stream_end);
147 :
148 4922 : if(rs < capy::zlib::error::ok && rs != capy::zlib::error::buf_err)
149 0 : rv.ec = rs;
150 :
151 4922 : return rv;
152 : }
153 : };
154 :
155 : class brotli_filter
156 : : public detail::brotli_filter_base
157 : {
158 : capy::brotli::encode_service& svc_;
159 : capy::brotli::encoder_state* state_;
160 :
161 : public:
162 0 : brotli_filter(
163 : const capy::polystore& ctx,
164 : http_proto::detail::workspace&,
165 : std::uint32_t comp_quality,
166 : std::uint32_t comp_window)
167 0 : : svc_(ctx.get<capy::brotli::encode_service>())
168 : {
169 : // TODO: use custom allocator
170 0 : state_ = svc_.create_instance(nullptr, nullptr, nullptr);
171 0 : if(!state_)
172 0 : detail::throw_bad_alloc();
173 : using encoder_parameter = capy::brotli::encoder_parameter;
174 0 : svc_.set_parameter(state_, encoder_parameter::quality, comp_quality);
175 0 : svc_.set_parameter(state_, encoder_parameter::lgwin, comp_window);
176 0 : }
177 :
178 0 : ~brotli_filter()
179 0 : {
180 0 : svc_.destroy_instance(state_);
181 0 : }
182 :
183 : private:
184 : virtual
185 : results
186 0 : do_process(
187 : buffers::mutable_buffer out,
188 : buffers::const_buffer in,
189 : bool more) noexcept override
190 : {
191 0 : auto* next_in = reinterpret_cast<const std::uint8_t*>(in.data());
192 0 : auto available_in = in.size();
193 0 : auto* next_out = reinterpret_cast<std::uint8_t*>(out.data());
194 0 : auto available_out = out.size();
195 :
196 : using encoder_operation =
197 : capy::brotli::encoder_operation;
198 :
199 0 : bool rs = svc_.compress_stream(
200 : state_,
201 : more ? encoder_operation::process : encoder_operation::finish,
202 : &available_in,
203 : &next_in,
204 : &available_out,
205 : &next_out,
206 : nullptr);
207 :
208 0 : results rv;
209 0 : rv.in_bytes = in.size() - available_in;
210 0 : rv.out_bytes = out.size() - available_out;
211 0 : rv.finished = svc_.is_finished(state_);
212 :
213 : // TODO: use proper error code
214 0 : if(rs == false)
215 0 : rv.ec = error::bad_payload;
216 :
217 0 : return rv;
218 : }
219 : };
220 :
221 : template<class UInt>
222 : std::size_t
223 8 : clamp(
224 : UInt x,
225 : std::size_t limit = (std::numeric_limits<
226 : std::size_t>::max)()) noexcept
227 : {
228 8 : if(x >= limit)
229 2 : return limit;
230 6 : return static_cast<std::size_t>(x);
231 : }
232 :
233 : class serializer_service
234 : {
235 : public:
236 : serializer::config cfg;
237 : std::size_t space_needed = 0;
238 :
239 24 : serializer_service(
240 : serializer::config const& cfg_)
241 24 : : cfg(cfg_)
242 : {
243 24 : space_needed += cfg.payload_buffer;
244 24 : space_needed += cfg.max_type_erase;
245 :
246 24 : if(cfg.apply_deflate_encoder || cfg.apply_gzip_encoder)
247 : {
248 : // TODO: Account for the number of allocations and
249 : // their overhead in the workspace.
250 :
251 : // https://www.zlib.net/zlib_tech.html
252 1 : space_needed +=
253 1 : (1 << (cfg.zlib_window_bits + 2)) +
254 1 : (1 << (cfg.zlib_mem_level + 9)) +
255 1 : (6 * 1024) +
256 : #ifdef __s390x__
257 : 5768 +
258 : #endif
259 1 : detail::workspace::space_needed<zlib_filter>();
260 : }
261 24 : }
262 : };
263 :
264 : } // namespace
265 :
266 : //------------------------------------------------
267 :
268 : void
269 24 : install_serializer_service(
270 : capy::polystore& ctx,
271 : serializer::config const& cfg)
272 : {
273 24 : ctx.emplace<serializer_service>(cfg);
274 24 : }
275 :
276 : //------------------------------------------------
277 :
278 : class serializer::impl
279 : {
280 : friend stream;
281 :
282 : enum class state
283 : {
284 : reset,
285 : start,
286 : header,
287 : body
288 : };
289 :
290 : enum class style
291 : {
292 : empty,
293 : buffers,
294 : source,
295 : stream
296 : };
297 :
298 : const capy::polystore& ctx_;
299 : serializer_service& svc_;
300 : detail::workspace ws_;
301 :
302 : detail::filter* filter_ = nullptr;
303 : cbs_gen* cbs_gen_ = nullptr;
304 : source* source_ = nullptr;
305 :
306 : buffers::circular_buffer out_;
307 : buffers::circular_buffer in_;
308 : detail::array_of_const_buffers prepped_;
309 : buffers::const_buffer tmp_;
310 :
311 : state state_ = state::start;
312 : style style_ = style::empty;
313 : uint8_t chunk_header_len_ = 0;
314 : bool more_input_ = false;
315 : bool is_chunked_ = false;
316 : bool needs_exp100_continue_ = false;
317 : bool filter_done_ = false;
318 :
319 : public:
320 25 : impl(const capy::polystore& ctx)
321 25 : : ctx_(ctx)
322 25 : , svc_(ctx_.get<serializer_service>())
323 25 : , ws_(svc_.space_needed)
324 : {
325 25 : }
326 :
327 : void
328 82 : reset() noexcept
329 : {
330 82 : ws_.clear();
331 82 : state_ = state::start;
332 82 : }
333 :
334 : auto
335 2430 : prepare() ->
336 : system::result<const_buffers_type>
337 : {
338 : // Precondition violation
339 2430 : if(state_ < state::header)
340 2 : detail::throw_logic_error();
341 :
342 : // Expect: 100-continue
343 2428 : if(needs_exp100_continue_)
344 : {
345 4 : if(!is_header_done())
346 4 : return const_buffers_type(
347 : prepped_.begin(),
348 2 : 1); // limit to header
349 :
350 2 : needs_exp100_continue_ = false;
351 :
352 2 : BOOST_HTTP_PROTO_RETURN_EC(
353 : error::expect_100_continue);
354 : }
355 :
356 2424 : if(!filter_)
357 : {
358 62 : switch(style_)
359 : {
360 6 : case style::empty:
361 6 : break;
362 :
363 20 : case style::buffers:
364 : {
365 : // add more buffers if prepped_ is half empty.
366 30 : if(more_input_ &&
367 10 : prepped_.capacity() >= prepped_.size())
368 : {
369 4 : prepped_.slide_to_front();
370 50 : while(prepped_.capacity() != 0)
371 : {
372 48 : auto buf = cbs_gen_->next();
373 48 : if(buf.size() == 0)
374 2 : break;
375 46 : prepped_.append(buf);
376 : }
377 4 : if(cbs_gen_->is_empty())
378 : {
379 2 : if(is_chunked_)
380 : {
381 1 : if(prepped_.capacity() != 0)
382 : {
383 1 : prepped_.append(
384 : crlf_and_final_chunk);
385 1 : more_input_ = false;
386 : }
387 : }
388 : else
389 : {
390 1 : more_input_ = false;
391 : }
392 : }
393 : }
394 20 : return detail::make_span(prepped_);
395 : }
396 :
397 23 : case style::source:
398 : {
399 23 : if(out_capacity() == 0 || !more_input_)
400 22 : break;
401 :
402 7 : const auto rs = source_->read(
403 7 : out_prepare());
404 :
405 7 : out_commit(rs.bytes);
406 :
407 7 : if(rs.ec.failed())
408 : {
409 1 : ws_.clear();
410 1 : state_ = state::reset;
411 1 : return rs.ec;
412 : }
413 :
414 6 : if(rs.finished)
415 : {
416 6 : more_input_ = false;
417 6 : out_finish();
418 : }
419 :
420 6 : break;
421 : }
422 :
423 13 : case style::stream:
424 13 : if(out_.size() == 0 && is_header_done() && more_input_)
425 3 : BOOST_HTTP_PROTO_RETURN_EC(
426 : error::need_data);
427 10 : break;
428 : }
429 : }
430 : else // filter
431 : {
432 2362 : switch(style_)
433 : {
434 4 : case style::empty:
435 : {
436 4 : if(out_capacity() == 0 || filter_done_)
437 4 : break;
438 :
439 4 : const auto rs = filter_->process(
440 4 : detail::make_span(out_prepare()),
441 : {}, // empty input
442 : false);
443 :
444 4 : if(rs.ec.failed())
445 : {
446 0 : ws_.clear();
447 0 : state_ = state::reset;
448 0 : return rs.ec;
449 : }
450 :
451 4 : out_commit(rs.out_bytes);
452 :
453 4 : if(rs.finished)
454 : {
455 4 : filter_done_ = true;
456 4 : out_finish();
457 : }
458 :
459 4 : break;
460 : }
461 :
462 376 : case style::buffers:
463 : {
464 2572 : while(out_capacity() != 0 && !filter_done_)
465 : {
466 2196 : if(more_input_ && tmp_.size() == 0)
467 : {
468 2008 : tmp_ = cbs_gen_->next();
469 2008 : if(tmp_.size() == 0) // cbs_gen_ is empty
470 16 : more_input_ = false;
471 : }
472 :
473 2196 : const auto rs = filter_->process(
474 2196 : detail::make_span(out_prepare()),
475 : {{ {tmp_}, {} }},
476 2196 : more_input_);
477 :
478 2196 : if(rs.ec.failed())
479 : {
480 0 : ws_.clear();
481 0 : state_ = state::reset;
482 0 : return rs.ec;
483 : }
484 :
485 2196 : buffers::remove_prefix(tmp_, rs.in_bytes);
486 2196 : out_commit(rs.out_bytes);
487 :
488 2196 : if(rs.out_short)
489 0 : break;
490 :
491 2196 : if(rs.finished)
492 : {
493 16 : filter_done_ = true;
494 16 : out_finish();
495 : }
496 : }
497 376 : break;
498 : }
499 :
500 734 : case style::source:
501 : {
502 2180 : while(out_capacity() != 0 && !filter_done_)
503 : {
504 1446 : if(more_input_ && in_.capacity() != 0)
505 : {
506 984 : const auto rs = source_->read(
507 984 : in_.prepare(in_.capacity()));
508 984 : if(rs.ec.failed())
509 : {
510 0 : ws_.clear();
511 0 : state_ = state::reset;
512 0 : return rs.ec;
513 : }
514 984 : if(rs.finished)
515 16 : more_input_ = false;
516 984 : in_.commit(rs.bytes);
517 : }
518 :
519 1446 : const auto rs = filter_->process(
520 1446 : detail::make_span(out_prepare()),
521 : in_.data(),
522 1446 : more_input_);
523 :
524 1446 : if(rs.ec.failed())
525 : {
526 0 : ws_.clear();
527 0 : state_ = state::reset;
528 0 : return rs.ec;
529 : }
530 :
531 1446 : in_.consume(rs.in_bytes);
532 1446 : out_commit(rs.out_bytes);
533 :
534 1446 : if(rs.out_short)
535 0 : break;
536 :
537 1446 : if(rs.finished)
538 : {
539 16 : filter_done_ = true;
540 16 : out_finish();
541 : }
542 : }
543 734 : break;
544 : }
545 :
546 1248 : case style::stream:
547 : {
548 1248 : if(out_capacity() == 0 || filter_done_)
549 804 : break;
550 :
551 1248 : const auto rs = filter_->process(
552 1248 : detail::make_span(out_prepare()),
553 : in_.data(),
554 1248 : more_input_);
555 :
556 1248 : if(rs.ec.failed())
557 : {
558 0 : ws_.clear();
559 0 : state_ = state::reset;
560 444 : return rs.ec;
561 : }
562 :
563 1248 : in_.consume(rs.in_bytes);
564 1248 : out_commit(rs.out_bytes);
565 :
566 1248 : if(rs.finished)
567 : {
568 16 : filter_done_ = true;
569 16 : out_finish();
570 : }
571 :
572 1248 : if(out_.size() == 0 && is_header_done() && more_input_)
573 444 : BOOST_HTTP_PROTO_RETURN_EC(
574 : error::need_data);
575 804 : break;
576 : }
577 : }
578 : }
579 :
580 1956 : prepped_.reset(!is_header_done());
581 5868 : for(auto const& cb : out_.data())
582 : {
583 3912 : if(cb.size() != 0)
584 1948 : prepped_.append(cb);
585 : }
586 1956 : return detail::make_span(prepped_);
587 : }
588 :
589 : void
590 3717 : consume(
591 : std::size_t n)
592 : {
593 : // Precondition violation
594 3717 : if(state_ < state::header)
595 1 : detail::throw_logic_error();
596 :
597 3716 : if(!is_header_done())
598 : {
599 : const auto header_remain =
600 85 : prepped_[0].size();
601 85 : if(n < header_remain)
602 : {
603 12 : prepped_.consume(n);
604 12 : return;
605 : }
606 73 : n -= header_remain;
607 73 : prepped_.consume(header_remain);
608 73 : state_ = state::body;
609 : }
610 :
611 3704 : prepped_.consume(n);
612 :
613 : // no-op when out_ is not in use
614 3704 : out_.consume(n);
615 :
616 3704 : if(!prepped_.empty())
617 1759 : return;
618 :
619 1945 : if(more_input_)
620 1837 : return;
621 :
622 108 : if(filter_ && !filter_done_)
623 34 : return;
624 :
625 74 : if(needs_exp100_continue_)
626 1 : return;
627 :
628 : // ready for next message
629 73 : reset();
630 : }
631 :
632 : void
633 84 : start_init(
634 : message_base const& m)
635 : {
636 : // Precondition violation
637 84 : if(state_ != state::start)
638 1 : detail::throw_logic_error();
639 :
640 : // TODO: To uphold the strong exception guarantee,
641 : // `state_` must be reset to `state::start` if an
642 : // exception is thrown during the start operation.
643 83 : state_ = state::header;
644 :
645 : // VFALCO what do we do with
646 : // metadata error code failures?
647 : // m.h_.md.maybe_throw();
648 :
649 83 : auto const& md = m.metadata();
650 83 : needs_exp100_continue_ = md.expect.is_100_continue;
651 :
652 : // Transfer-Encoding
653 83 : is_chunked_ = md.transfer_encoding.is_chunked;
654 :
655 : // Content-Encoding
656 83 : switch (md.content_encoding.coding)
657 : {
658 26 : case content_coding::deflate:
659 26 : if(!svc_.cfg.apply_deflate_encoder)
660 0 : goto no_filter;
661 52 : filter_ = &ws_.emplace<zlib_filter>(
662 : ctx_,
663 26 : ws_,
664 26 : svc_.cfg.zlib_comp_level,
665 26 : svc_.cfg.zlib_window_bits,
666 26 : svc_.cfg.zlib_mem_level);
667 26 : filter_done_ = false;
668 26 : break;
669 :
670 26 : case content_coding::gzip:
671 26 : if(!svc_.cfg.apply_gzip_encoder)
672 0 : goto no_filter;
673 52 : filter_ = &ws_.emplace<zlib_filter>(
674 : ctx_,
675 26 : ws_,
676 26 : svc_.cfg.zlib_comp_level,
677 52 : svc_.cfg.zlib_window_bits + 16,
678 26 : svc_.cfg.zlib_mem_level);
679 26 : filter_done_ = false;
680 26 : break;
681 :
682 0 : case content_coding::br:
683 0 : if(!svc_.cfg.apply_brotli_encoder)
684 0 : goto no_filter;
685 0 : filter_ = &ws_.emplace<brotli_filter>(
686 : ctx_,
687 0 : ws_,
688 0 : svc_.cfg.brotli_comp_quality,
689 0 : svc_.cfg.brotli_comp_window);
690 0 : filter_done_ = false;
691 0 : break;
692 :
693 0 : no_filter:
694 31 : default:
695 31 : filter_ = nullptr;
696 31 : break;
697 : }
698 83 : }
699 :
700 : void
701 12 : start_empty(
702 : message_base const& m)
703 : {
704 12 : start_init(m);
705 11 : style_ = style::empty;
706 :
707 11 : prepped_ = make_array(
708 : 1 + // header
709 : 2); // out buffer pairs
710 :
711 11 : out_init();
712 :
713 11 : if(!filter_)
714 7 : out_finish();
715 :
716 11 : prepped_.append({ m.h_.cbuf, m.h_.size });
717 11 : more_input_ = false;
718 11 : }
719 :
720 : void
721 24 : start_buffers(
722 : message_base const& m,
723 : cbs_gen& cbs_gen)
724 : {
725 : // start_init() already called
726 24 : style_ = style::buffers;
727 24 : cbs_gen_ = &cbs_gen;
728 :
729 24 : if(!filter_)
730 : {
731 8 : auto stats = cbs_gen_->stats();
732 8 : auto batch_size = clamp(stats.count, 16);
733 :
734 0 : prepped_ = make_array(
735 : 1 + // header
736 8 : batch_size + // buffers
737 8 : (is_chunked_ ? 2 : 0)); // chunk header + final chunk
738 :
739 8 : prepped_.append({ m.h_.cbuf, m.h_.size });
740 8 : more_input_ = (batch_size != 0);
741 :
742 8 : if(is_chunked_)
743 : {
744 2 : if(!more_input_)
745 : {
746 1 : prepped_.append(final_chunk);
747 : }
748 : else
749 : {
750 1 : auto h_len = chunk_header_len(stats.size);
751 : buffers::mutable_buffer mb(
752 1 : ws_.reserve_front(h_len), h_len);
753 1 : write_chunk_header({{ {mb}, {} }}, stats.size);
754 1 : prepped_.append(mb);
755 : }
756 : }
757 8 : return;
758 : }
759 :
760 : // filter
761 :
762 16 : prepped_ = make_array(
763 : 1 + // header
764 : 2); // out buffer pairs
765 :
766 16 : out_init();
767 :
768 16 : prepped_.append({ m.h_.cbuf, m.h_.size });
769 16 : tmp_ = {};
770 16 : more_input_ = true;
771 : }
772 :
773 : void
774 25 : start_source(
775 : message_base const& m,
776 : source& source)
777 : {
778 : // start_init() already called
779 25 : style_ = style::source;
780 25 : source_ = &source;
781 :
782 25 : prepped_ = make_array(
783 : 1 + // header
784 : 2); // out buffer pairs
785 :
786 25 : if(filter_)
787 : {
788 : // TODO: smarter buffer distribution
789 16 : auto const n = (ws_.size() - 1) / 2;
790 16 : in_ = { ws_.reserve_front(n), n };
791 : }
792 :
793 25 : out_init();
794 :
795 25 : prepped_.append({ m.h_.cbuf, m.h_.size });
796 25 : more_input_ = true;
797 25 : }
798 :
799 : stream
800 23 : start_stream(message_base const& m)
801 : {
802 23 : start_init(m);
803 23 : style_ = style::stream;
804 :
805 23 : prepped_ = make_array(
806 : 1 + // header
807 : 2); // out buffer pairs
808 :
809 23 : if(filter_)
810 : {
811 : // TODO: smarter buffer distribution
812 16 : auto const n = (ws_.size() - 1) / 2;
813 16 : in_ = { ws_.reserve_front(n), n };
814 : }
815 :
816 23 : out_init();
817 :
818 23 : prepped_.append({ m.h_.cbuf, m.h_.size });
819 23 : more_input_ = true;
820 23 : return stream{ this };
821 : }
822 :
823 : bool
824 2483 : is_done() const noexcept
825 : {
826 2483 : return state_ == state::start;
827 : }
828 :
829 : detail::workspace&
830 49 : ws() noexcept
831 : {
832 49 : return ws_;
833 : }
834 :
835 : private:
836 : bool
837 6127 : is_header_done() const noexcept
838 : {
839 6127 : return state_ == state::body;
840 : }
841 :
842 : detail::array_of_const_buffers
843 83 : make_array(std::size_t n)
844 : {
845 83 : BOOST_ASSERT(n <= std::uint16_t(-1));
846 :
847 : return {
848 83 : ws_.push_array(n,
849 0 : buffers::const_buffer{}),
850 83 : static_cast<std::uint16_t>(n) };
851 : }
852 :
853 : void
854 75 : out_init()
855 : {
856 : // use all the remaining buffer
857 75 : auto const n = ws_.size() - 1;
858 75 : out_ = { ws_.reserve_front(n), n };
859 75 : chunk_header_len_ =
860 75 : chunk_header_len(out_.capacity());
861 75 : if(out_capacity() == 0)
862 0 : detail::throw_length_error();
863 75 : }
864 :
865 : buffers::mutable_buffer_pair
866 4907 : out_prepare() noexcept
867 : {
868 4907 : auto mbp = out_.prepare(out_.capacity());
869 4907 : if(is_chunked_)
870 : {
871 2453 : buffers::remove_prefix(
872 2453 : mbp, chunk_header_len_);
873 2453 : buffers::remove_suffix(
874 : mbp, crlf_and_final_chunk.size());
875 : }
876 4907 : return mbp;
877 : }
878 :
879 : void
880 4907 : out_commit(
881 : std::size_t n) noexcept
882 : {
883 4907 : if(is_chunked_)
884 : {
885 2453 : if(n == 0)
886 1308 : return;
887 :
888 1145 : write_chunk_header(out_.prepare(chunk_header_len_), n);
889 1145 : out_.commit(chunk_header_len_);
890 :
891 1145 : out_.prepare(n);
892 1145 : out_.commit(n);
893 :
894 1145 : buffers::copy(out_.prepare(crlf.size()), crlf);
895 1145 : out_.commit(crlf.size());
896 : }
897 : else
898 : {
899 2454 : out_.commit(n);
900 : }
901 : }
902 :
903 : std::size_t
904 6126 : out_capacity() const noexcept
905 : {
906 6126 : if(is_chunked_)
907 : {
908 3060 : auto const overhead = chunk_header_len_ +
909 3060 : crlf_and_final_chunk.size();
910 3060 : if(out_.capacity() < overhead)
911 541 : return 0;
912 2519 : return out_.capacity() - overhead;
913 : }
914 3066 : return out_.capacity();
915 : }
916 :
917 : void
918 72 : out_finish() noexcept
919 : {
920 72 : if(is_chunked_)
921 : {
922 33 : buffers::copy(
923 33 : out_.prepare(final_chunk.size()), final_chunk);
924 33 : out_.commit(final_chunk.size());
925 : }
926 72 : }
927 : };
928 :
929 : //------------------------------------------------
930 :
931 31 : serializer::
932 : ~serializer()
933 : {
934 31 : delete impl_;
935 31 : }
936 :
937 1 : serializer::
938 1 : serializer(serializer&& other) noexcept
939 1 : : impl_(other.impl_)
940 : {
941 1 : other.impl_ = nullptr;
942 1 : }
943 :
944 : serializer&
945 2 : serializer::
946 : operator=(serializer&& other) noexcept
947 : {
948 2 : if(this != &other)
949 : {
950 2 : delete impl_;
951 2 : impl_ = other.impl_;
952 2 : other.impl_ = nullptr;
953 : }
954 2 : return *this;
955 : }
956 :
957 25 : serializer::
958 25 : serializer(capy::polystore& ctx)
959 25 : : impl_(new impl(ctx))
960 : {
961 : // TODO: use a single allocation for
962 : // impl and workspace buffer.
963 25 : }
964 :
965 : void
966 9 : serializer::
967 : reset() noexcept
968 : {
969 9 : BOOST_ASSERT(impl_);
970 9 : impl_->reset();
971 9 : }
972 :
973 : void
974 12 : serializer::
975 : start(message_base const& m)
976 : {
977 12 : BOOST_ASSERT(impl_);
978 12 : impl_->start_empty(m);
979 11 : }
980 :
981 : auto
982 23 : serializer::
983 : start_stream(
984 : message_base const& m) -> stream
985 : {
986 23 : BOOST_ASSERT(impl_);
987 23 : return impl_->start_stream(m);
988 : }
989 :
990 : auto
991 2430 : serializer::
992 : prepare() ->
993 : system::result<const_buffers_type>
994 : {
995 2430 : BOOST_ASSERT(impl_);
996 2430 : return impl_->prepare();
997 : }
998 :
999 : void
1000 3717 : serializer::
1001 : consume(std::size_t n)
1002 : {
1003 3717 : BOOST_ASSERT(impl_);
1004 3717 : impl_->consume(n);
1005 3716 : }
1006 :
1007 : bool
1008 2483 : serializer::
1009 : is_done() const noexcept
1010 : {
1011 2483 : BOOST_ASSERT(impl_);
1012 2483 : return impl_->is_done();
1013 : }
1014 :
1015 : //------------------------------------------------
1016 :
1017 : detail::workspace&
1018 49 : serializer::
1019 : ws()
1020 : {
1021 49 : BOOST_ASSERT(impl_);
1022 49 : return impl_->ws();
1023 : }
1024 :
1025 : void
1026 49 : serializer::
1027 : start_init(message_base const& m)
1028 : {
1029 49 : BOOST_ASSERT(impl_);
1030 49 : impl_->start_init(m);
1031 49 : }
1032 :
1033 : void
1034 24 : serializer::
1035 : start_buffers(
1036 : message_base const& m,
1037 : cbs_gen& cbs_gen)
1038 : {
1039 24 : BOOST_ASSERT(impl_);
1040 24 : impl_->start_buffers(m, cbs_gen);
1041 24 : }
1042 :
1043 : void
1044 25 : serializer::
1045 : start_source(
1046 : message_base const& m,
1047 : source& source)
1048 : {
1049 25 : BOOST_ASSERT(impl_);
1050 25 : impl_->start_source(m, source);
1051 25 : }
1052 :
1053 : //------------------------------------------------
1054 :
1055 : std::size_t
1056 1261 : serializer::
1057 : stream::
1058 : capacity() const
1059 : {
1060 : // Precondition violation
1061 1261 : if(!is_open())
1062 1 : detail::throw_logic_error();
1063 :
1064 1260 : if(impl_->filter_)
1065 1236 : return impl_->in_.capacity();
1066 :
1067 24 : return impl_->out_capacity();
1068 : }
1069 :
1070 : auto
1071 1243 : serializer::
1072 : stream::
1073 : prepare() ->
1074 : mutable_buffers_type
1075 : {
1076 : // Precondition violation
1077 1243 : if(!is_open())
1078 1 : detail::throw_logic_error();
1079 :
1080 1242 : if(impl_->filter_)
1081 1236 : return impl_->in_.prepare(
1082 2472 : impl_->in_.capacity());
1083 :
1084 6 : return impl_->out_prepare();
1085 : }
1086 :
1087 : void
1088 1244 : serializer::
1089 : stream::
1090 : commit(std::size_t n)
1091 : {
1092 : // Precondition violation
1093 1244 : if(!is_open())
1094 1 : detail::throw_logic_error();
1095 :
1096 : // Precondition violation
1097 1243 : if(n > capacity())
1098 1 : detail::throw_invalid_argument();
1099 :
1100 1242 : if(impl_->filter_)
1101 1236 : return impl_->in_.commit(n);
1102 :
1103 6 : impl_->out_commit(n);
1104 : }
1105 :
1106 : void
1107 47 : serializer::
1108 : stream::
1109 : close() noexcept
1110 : {
1111 47 : if(!is_open())
1112 24 : return; // no-op;
1113 :
1114 23 : if(!impl_->filter_)
1115 7 : impl_->out_finish();
1116 :
1117 23 : impl_->more_input_ = false;
1118 23 : impl_ = nullptr;
1119 : }
1120 :
1121 : } // http_proto
1122 : } // boost
|