44#include "D4StreamMarshaller.h"
45#ifdef USE_POSIX_THREADS
46#include "MarshallerThread.h"
49#if USE_XDR_FOR_IEEE754_ENCODING
66inline uint8_t* WriteVarint64ToArrayInline(uint64_t value, uint8_t* target) {
69 uint32_t part0 =
static_cast<uint32_t
>(value );
70 uint32_t part1 =
static_cast<uint32_t
>(value >> 28);
71 uint32_t part2 =
static_cast<uint32_t
>(value >> 56);
84 if (part0 < (1 << 14)) {
85 if (part0 < (1 << 7)) {
91 if (part0 < (1 << 21)) {
98 if (part1 < (1 << 14)) {
99 if (part1 < (1 << 7)) {
100 size = 5;
goto size5;
102 size = 6;
goto size6;
105 if (part1 < (1 << 21)) {
106 size = 7;
goto size7;
108 size = 8;
goto size8;
113 if (part2 < (1 << 7)) {
114 size = 9;
goto size9;
116 size = 10;
goto size10;
122 size10: target[9] =
static_cast<uint8_t
>((part2 >> 7) | 0x80);
123 size9 : target[8] =
static_cast<uint8_t
>((part2 ) | 0x80);
124 size8 : target[7] =
static_cast<uint8_t
>((part1 >> 21) | 0x80);
125 size7 : target[6] =
static_cast<uint8_t
>((part1 >> 14) | 0x80);
126 size6 : target[5] =
static_cast<uint8_t
>((part1 >> 7) | 0x80);
127 size5 : target[4] =
static_cast<uint8_t
>((part1 ) | 0x80);
128 size4 : target[3] =
static_cast<uint8_t
>((part0 >> 21) | 0x80);
129 size3 : target[2] =
static_cast<uint8_t
>((part0 >> 14) | 0x80);
130 size2 : target[1] =
static_cast<uint8_t
>((part0 >> 7) | 0x80);
131 size1 : target[0] =
static_cast<uint8_t
>((part0 ) | 0x80);
133 target[size-1] &= 0x7F;
134 return target + size;
138#if USE_XDR_FOR_IEEE754_ENCODING
139void D4StreamMarshaller::m_serialize_reals(
char *val,
unsigned int num,
int width,
Type type) {
140 dods_uint64 size = num * width;
142 char *buf =
new char[size];
144 xdrmem_create(&xdr, buf.data(), size, XDR_ENCODE);
147 throw InternalErr(__FILE__, __LINE__,
"Error serializing a Float64 array");
149 if (xdr_getpos(&xdr) != size)
150 throw InternalErr(__FILE__, __LINE__,
"Error serializing a Float64 array");
156 dods_float32 *lbuf =
reinterpret_cast<dods_float32 *
>(buf.data());
158 dods_int32 *i =
reinterpret_cast<dods_int32 *
>(lbuf++);
162 dods_float64 *lbuf =
reinterpret_cast<dods_float64 *
>(buf.data());
164 dods_int64 *i =
reinterpret_cast<dods_int64 *
>(lbuf++);
169#ifdef USE_POSIX_THREADS
170 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
172 tm->increment_child_thread_count();
178 d_out.write(buf.data(), size);
198D4StreamMarshaller::D4StreamMarshaller(ostream &out,
bool write_data) : d_out(out), d_write_data(write_data), tm(0) {
199 assert(
sizeof(std::streamsize) >=
sizeof(int64_t));
201#if USE_XDR_FOR_IEEE754_ENCODING
205 xdrmem_create(&d_scalar_sink, d_ieee754_buf,
sizeof(dods_float64), XDR_ENCODE);
208#ifdef USE_POSIX_THREADS
214 out.exceptions(ostream::failbit | ostream::badbit);
217D4StreamMarshaller::~D4StreamMarshaller() {
218#if USE_XDR_FOR_IEEE754_ENCODING
219 xdr_destroy(&d_scalar_sink);
241 oss.setf(ios::hex, ios::basefield);
242 oss << setfill(
'0') << setw(8) << d_checksum.GetCrc32();
254 Crc32::checksum chk = d_checksum.GetCrc32();
255#ifdef USE_POSIX_THREADS
256 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
258 d_out.write(
reinterpret_cast<char *
>(&chk),
sizeof(Crc32::checksum));
266 d_checksum.AddData(
reinterpret_cast<const uint8_t *
>(data), len);
269void D4StreamMarshaller::put_byte(dods_byte val) {
273 DBG(std::cerr <<
"put_byte: " << val << std::endl);
274#ifdef USE_POSIX_THREADS
276 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
278 d_out.write(
reinterpret_cast<char *
>(&val),
sizeof(dods_byte));
282void D4StreamMarshaller::put_int8(dods_int8 val) {
286 DBG(std::cerr <<
"put_int8: " << val << std::endl);
287#ifdef USE_POSIX_THREADS
288 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
290 d_out.write(
reinterpret_cast<char *
>(&val),
sizeof(dods_int8));
294void D4StreamMarshaller::put_int16(dods_int16 val) {
298#ifdef USE_POSIX_THREADS
299 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
301 d_out.write(
reinterpret_cast<char *
>(&val),
sizeof(dods_int16));
305void D4StreamMarshaller::put_int32(dods_int32 val) {
309#ifdef USE_POSIX_THREADS
310 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
312 d_out.write(
reinterpret_cast<char *
>(&val),
sizeof(dods_int32));
316void D4StreamMarshaller::put_int64(dods_int64 val) {
320#ifdef USE_POSIX_THREADS
321 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
323 d_out.write(
reinterpret_cast<const char *
>(&val),
sizeof(dods_int64));
327void D4StreamMarshaller::put_float32(dods_float32 val) {
328#if !USE_XDR_FOR_IEEE754_ENCODING
329 assert(std::numeric_limits<float>::is_iec559);
334#ifdef USE_POSIX_THREADS
335 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
337 d_out.write(
reinterpret_cast<const char *
>(&val),
sizeof(dods_float32));
347 if (std::numeric_limits<float>::is_iec559) {
348#ifdef USE_POSIX_THREADS
349 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
351 d_out.write(
reinterpret_cast<char *
>(&val),
sizeof(dods_float32));
353 if (!xdr_setpos(&d_scalar_sink, 0))
354 throw InternalErr(__FILE__, __LINE__,
"Error serializing a Float32 variable");
356 if (!xdr_float(&d_scalar_sink, &val))
357 throw InternalErr(__FILE__, __LINE__,
"Error serializing a Float32 variable");
359 if (xdr_getpos(&d_scalar_sink) !=
sizeof(dods_float32))
360 throw InternalErr(__FILE__, __LINE__,
"Error serializing a Float32 variable");
365 dods_int32 *i =
reinterpret_cast<dods_int32 *
>(&d_ieee754_buf);
368#ifdef USE_POSIX_THREADS
369 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
371 d_out.write(d_ieee754_buf,
sizeof(dods_float32));
377void D4StreamMarshaller::put_float64(dods_float64 val) {
378#if !USE_XDR_FOR_IEEE754_ENCODING
379 assert(std::numeric_limits<double>::is_iec559);
384#ifdef USE_POSIX_THREADS
385 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
387 d_out.write(
reinterpret_cast<const char *
>(&val),
sizeof(dods_float64));
393 if (std::numeric_limits<double>::is_iec559) {
394#ifdef USE_POSIX_THREADS
395 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
397 d_out.write(
reinterpret_cast<char *
>(&val),
sizeof(dods_float64));
400 if (!xdr_setpos(&d_scalar_sink, 0))
401 throw InternalErr(__FILE__, __LINE__,
"Error serializing a Float64 variable");
403 if (!xdr_double(&d_scalar_sink, &val))
404 throw InternalErr(__FILE__, __LINE__,
"Error serializing a Float64 variable");
406 if (xdr_getpos(&d_scalar_sink) !=
sizeof(dods_float64))
407 throw InternalErr(__FILE__, __LINE__,
"Error serializing a Float64 variable");
412 dods_int64 *i =
reinterpret_cast<dods_int64 *
>(&d_ieee754_buf);
416#ifdef USE_POSIX_THREADS
417 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
419 d_out.write(d_ieee754_buf,
sizeof(dods_float64));
425void D4StreamMarshaller::put_uint16(dods_uint16 val) {
426 checksum_update(&val,
sizeof(dods_uint16));
429#ifdef USE_POSIX_THREADS
430 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
432 d_out.write(
reinterpret_cast<char *
>(&val),
sizeof(dods_uint16));
436void D4StreamMarshaller::put_uint32(dods_uint32 val) {
437 checksum_update(&val,
sizeof(dods_uint32));
440#ifdef USE_POSIX_THREADS
441 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
443 d_out.write(
reinterpret_cast<char *
>(&val),
sizeof(dods_uint32));
447void D4StreamMarshaller::put_uint64(dods_uint64 val) {
448 checksum_update(&val,
sizeof(dods_uint64));
451#ifdef USE_POSIX_THREADS
452 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
454 d_out.write(
reinterpret_cast<char *
>(&val),
sizeof(dods_uint64));
467#ifdef USE_POSIX_THREADS
468 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
470 d_out.write(
reinterpret_cast<const char *
>(&count),
sizeof(int64_t));
473void D4StreamMarshaller::put_str(
const string &val) {
474 checksum_update(val.c_str(), val.length());
477 int64_t len = val.length();
478#ifdef USE_POSIX_THREADS
479 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
481 d_out.write(
reinterpret_cast<const char *
>(&len),
sizeof(int64_t));
482 d_out.write(val.data(), val.length());
486void D4StreamMarshaller::put_url(
const string &val) { put_str(val); }
488void D4StreamMarshaller::put_opaque_dap4(
const char *val, int64_t len) {
492 checksum_update(val, len);
495#ifdef USE_POSIX_THREADS
496 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
498 d_out.write(
reinterpret_cast<const char *
>(&len),
sizeof(int64_t));
500 char *byte_buf =
new char[len];
501 memcpy(byte_buf, val, len);
503 tm->increment_child_thread_count();
504 tm->start_thread(MarshallerThread::write_thread, d_out, byte_buf, len);
506 d_out.write(
reinterpret_cast<const char *
>(&len),
sizeof(int64_t));
507 d_out.write(val, len);
519 assert(num_bytes >= 0);
524#ifdef USE_POSIX_THREADS
525 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
527 char *buf =
new char[num_bytes];
528 memcpy(buf, val, num_bytes);
530 tm->increment_child_thread_count();
533 d_out.write(val, num_bytes);
540 assert(num_elem >= 0);
541 assert(elem_size > 0);
547 assert(!
"Don't call this method for bytes, use put_vector(val, bytes) instead");
552 assert(!(num_elem & 0x4000000000000000));
553 bytes = num_elem << 1;
556 assert(!(num_elem & 0x6000000000000000));
557 bytes = num_elem << 2;
560 assert(!(num_elem & 0x7000000000000000));
561 bytes = num_elem << 3;
564 bytes = num_elem * elem_size;
568 checksum_update(val, bytes);
571#ifdef USE_POSIX_THREADS
572 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
574 char *buf =
new char[bytes];
575 memcpy(buf, val, bytes);
577 tm->increment_child_thread_count();
578 tm->start_thread(MarshallerThread::write_thread, d_out, buf, bytes);
580 d_out.write(val, bytes);
595#if !USE_XDR_FOR_IEEE754_ENCODING
597 assert(std::numeric_limits<float>::is_iec559);
599 assert(num_elem >= 0);
604 assert(!(num_elem & 0xe000000000000000));
606 num_elem = num_elem << 2;
611#ifdef USE_POSIX_THREADS
612 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
614 char *buf =
new char[num_elem];
615 memcpy(buf, val, num_elem);
617 tm->increment_child_thread_count();
620 d_out.write(val, num_elem);
626 assert(num_elem >= 0);
631 assert(!(num_elem & 0xe000000000000000));
633 int64_t bytes = num_elem << 2;
638 if (!std::numeric_limits<float>::is_iec559) {
640 m_serialize_reals(val, num_elem, 4, type);
642#ifdef USE_POSIX_THREADS
643 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
645 char *buf =
new char[bytes];
646 memcpy(buf, val, bytes);
648 tm->increment_child_thread_count();
651 d_out.write(val, bytes);
667#if !USE_XDR_FOR_IEEE754_ENCODING
669 assert(std::numeric_limits<double>::is_iec559);
671 assert(num_elem >= 0);
673 assert(!(num_elem & 0xf000000000000000));
675 num_elem = num_elem << 3;
680#ifdef USE_POSIX_THREADS
681 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
683 char *buf =
new char[num_elem];
684 memcpy(buf, val, num_elem);
686 tm->increment_child_thread_count();
689 d_out.write(val, num_elem);
694 assert(num_elem >= 0);
699 assert(!(num_elem & 0xe000000000000000));
701 int64_t bytes = num_elem << 3;
706 if (!std::numeric_limits<double>::is_iec559) {
708 m_serialize_reals(val, num_elem, 8, type);
710#ifdef USE_POSIX_THREADS
711 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
713 char *buf =
new char[bytes];
714 memcpy(buf, val, bytes);
716 tm->increment_child_thread_count();
719 d_out.write(val, bytes);
765 throw InternalErr(__FILE__, __LINE__,
"Array of String should not be passed to put_vector.");
768 throw InternalErr(__FILE__, __LINE__,
"Array of Array not allowed.");
771 case dods_structure_c:
772 case dods_sequence_c:
773 throw InternalErr(__FILE__, __LINE__,
"Array of String should not be passed to put_vector.");
776 throw InternalErr(__FILE__, __LINE__,
"Grid is not part of DAP4.");
779 throw InternalErr(__FILE__, __LINE__,
"Unknown datatype.");
784 strm << DapIndent::LMarg <<
"D4StreamMarshaller::dump - (" << (
void *)
this <<
")" << endl;
virtual void put_checksum()
Write the checksum Write the checksum for the data sent since the last call to reset_checksum() to th...
virtual void reset_checksum()
virtual void put_vector_float32(char *val, int64_t num_elem)
Write a fixed size vector.
virtual void put_vector_part(char *, unsigned int, int, Type)
virtual void put_vector(char *val, int64_t num_bytes)
Write a fixed size vector.
virtual void dump(std::ostream &strm) const
dump the contents of this object to the specified ostream
virtual void put_count(int64_t count)
virtual void checksum_update(const void *data, unsigned long len)
virtual void put_vector_float64(char *val, int64_t num_elem)
Write a fixed size vector of float64s.
virtual string get_checksum()
A class for software fault reporting.
static void * write_thread(void *arg)
static xdrproc_t xdr_coder(const Type &t)
Returns a function used to encode elements of an array.
top level DAP object to house generic methods
Type
Identifies the data type.
bool is_host_big_endian()
Does this host use big-endian byte order?