Home Reference Source Repository

src/impl/WriteBuffer.js

import * as StreamBuffers from 'stream-buffers';
import * as _ from 'lodash';

import InfluxDBError from '~/InfluxDBError';
import FieldType from '~/Field';

/**
 * Represents a single data point batch buffer written into into the database. It provides a write
 * method to add new points and a getContent() method to retrieve the content of the buffer.
 *
 * @ignore
 */
class WriteBuffer {
  constructor(schemas, autoGenerateTimestamps) {
    this.schemas = schemas;
    this.autoGenerateTimestamps = autoGenerateTimestamps;
    this.stream = new StreamBuffers.WritableStreamBuffer();
    this.firstWriteTimestamp = null;
    this.batchSize = 0;
    // stores promise resolve/reject functions for writes that are buffered so that these write
    // promises are resolved/rejected when the buffer is flushed. The values are objects with
    // resolve/reject properties holding the the promise reject/resolve functions
    this.writePromises = [];
  }

  createPromiseToResolveOnFlush() {
    return new Promise((resolve, reject) => {
      this.writePromises.push({ resolve, reject });
    });
  }

  resolveWritePromises() {
    _.forEach(this.writePromises, (promise) => { promise.resolve(); });
  }

  rejectWritePromises(error) {
    _.forEach(this.writePromises, (promise) => { promise.reject(error); });
  }

  write(dataPoints) {
    _.forEach(dataPoints, (dataPoint) => {
      this.validateDataPoint(dataPoint);
    });
    _.forEach(dataPoints, (dataPoint) => {
      this.serializeDataPoint(dataPoint);
      this.batchSize += 1;
    });
  }

  validateDataPoint(dataPoint) {
    this.validateTags(dataPoint);
    this.validateFields(dataPoint);
    WriteBuffer.validateTimestamp(dataPoint);
  }

  validateTags(dataPoint) {
    if (Array.isArray(dataPoint.tags)) {
      _.forEach(dataPoint.tags, (tag) => {
        if ((typeof tag) !== 'object') {
          throw new InfluxDBError('When defining tags as an array, all array members must be objects' +
              ` with key and value properties: Measurement: ${dataPoint.measurement}`);
        }
        if (!tag.key) {
          throw new InfluxDBError(`When defining tags as objects, key property must be supplied. Measurement: ${dataPoint.measurement}`);
        }
        this.validateTagValue(tag.value, tag.key, dataPoint);
      });
    } else if ((typeof dataPoint.tags) === 'object') {
      _.forOwn(dataPoint.tags, (tagValue, tagKey) => {
        this.validateTagValue(tagValue, tagKey, dataPoint);
      });
    } else if (dataPoint.tags) {
      throw new InfluxDBError('Datapoint tags must be supplied as an array or object');
    }
  }

  validateTagValue(value, tagName, dataPoint) {
    const schema = this.schemas[dataPoint.measurement];
    if ((typeof value) !== 'string') throw new InfluxDBError('Invalid tag value type, must be a string');
    if (schema && schema.tagsDictionary && !schema.tagsDictionary[tagName]) {
      throw new InfluxDBError(`Tag value '${value}' is not allowed for measurement ` +
          `${dataPoint.measurement} based on schema.`);
    }
  }

  validateFields(dataPoint) {
    let fieldsDefined = false;
    if (!dataPoint.fields) WriteBuffer.reportMissingFields(dataPoint);
    if (Array.isArray(dataPoint.fields)) {
      _.forEach(dataPoint.fields, (field) => {
        if ((typeof field.key) !== 'string') {
          throw new InfluxDBError(`Field key must be a string, measurement: '${dataPoint.measurement}'`);
        }
        if (field.value != null) {
          this.validateFieldValue(field.value, field.key, dataPoint);
          fieldsDefined = true;
        }
      });
    } else if ((typeof dataPoint.fields) === 'object') {
      _.forOwn(dataPoint.fields, (fieldValue, fieldKey) => {
        if (fieldValue != null) {
          this.validateFieldValue(fieldValue, fieldKey, dataPoint);
          fieldsDefined = true;
        }
      });
    } else {
      throw new InfluxDBError('Data point fields property must be an array or an object');
    }

    if (!fieldsDefined) WriteBuffer.reportMissingFields(dataPoint);
  }

  static reportMissingFields(dataPoint) {
    throw new InfluxDBError(`Data point has no fields in measurement '${dataPoint.measurement}'`);
  }

  validateFieldValue(value, fieldName, dataPoint) {
    const schema = this.getSchemaRecord(dataPoint.measurement);
    const userSpecifiedType = WriteBuffer.getUserSpecifiedType(schema, fieldName);
    if (schema && schema.fields && userSpecifiedType === null) {
      throw new InfluxDBError(`Field ${fieldName} is not declared in the schema` +
          ` for measurement ${dataPoint.measurement}`);
    }
    if (userSpecifiedType) {
      switch (userSpecifiedType) {
        case FieldType.STRING:
          WriteBuffer.validateType('string', typeof value, fieldName, dataPoint);
          return;
        case FieldType.BOOLEAN:
          WriteBuffer.validateType('boolean', typeof value, fieldName, dataPoint);
          return;
        case FieldType.FLOAT:
          WriteBuffer.validateType('number', typeof value, fieldName, dataPoint);
          return;
        case FieldType.INTEGER:
          WriteBuffer.validateType('number', typeof value, fieldName, dataPoint);
          WriteBuffer.validateInteger(value, fieldName, dataPoint);
          return;
        default:
      }
    } else {
      switch (typeof value) {
        case 'string':
        case 'boolean':
        case 'number':
          return;
        default:
      }
    }
    throw new InfluxDBError(`Unsupported value type:${(typeof value)}`);
  }

  static getUserSpecifiedType(schema, fieldKey) {
    if (schema && schema.fields) {
      return schema.fields[fieldKey];
    }
    return undefined;
  }

  static validateType(expectedType, givenType, fieldName, dataPoint) {
    if (givenType !== expectedType) {
      throw new InfluxDBError(`Invalid type supplied for field '${fieldName}' of ` +
          `measurement '${dataPoint.measurement}.' ` +
          `Supplied '${givenType}' but '${expectedType}' is required`);
    }
  }

  static validateInteger(value, fieldName, dataPoint) {
    if (value !== Math.floor(value)) {
      throw new InfluxDBError(`Invalid value supplied for field '${fieldName}' of ` +
          `measurement '${dataPoint.measurement}'. ` +
          'Should have been an integer but supplied number has a fraction part.');
    }
  }

  static validateTimestamp(dataPoint) {
    const timestamp = dataPoint.timestamp;
    switch (typeof timestamp) {
      case 'string':
      case 'number':
      case 'undefined':
        break;
      case 'object':
        if ((typeof timestamp.getTime) !== 'function') {
          throw new InfluxDBError('Timestamp must be an instance of Date');
        }
        break;
      default:
        throw new InfluxDBError(`Unsupported timestamp type: ${typeof timestamp}`);
    }
  }

  serializeDataPoint(dataPoint) {
    const outputStream = this.stream;
    outputStream.write(WriteBuffer.escapeMeasurementName(dataPoint.measurement));
    if (dataPoint.tags) {
      outputStream.write(',');
      this.serializeTags(dataPoint);
    }
    outputStream.write(' ');
    this.serializeFields(dataPoint);
    outputStream.write(' ');
    outputStream.write(WriteBuffer.serializeTimestamp(dataPoint.timestamp));
    outputStream.write('\n');
  }

  serializeTags(dataPoint) {
    const outputStream = this.stream;
    if (Array.isArray(dataPoint.tags)) {
      _.forEach(dataPoint.tags, (tag) => {
        outputStream.write(WriteBuffer.escape(tag.key));
        outputStream.write('=');
        outputStream.write(WriteBuffer.escape(tag.value));
      });
    } else if ((typeof dataPoint.tags) === 'object') {
      _.forOwn(dataPoint.tags, (tagValue, tagKey) => {
        outputStream.write(WriteBuffer.escape(tagKey));
        outputStream.write('=');
        outputStream.write(WriteBuffer.escape(tagValue));
      });
    }
  }

  serializeFields(dataPoint) {
    const schema = this.getSchemaRecord(dataPoint.measurement);
    const outputStream = this.stream;
    if (Array.isArray(dataPoint.fields)) {
      _.forEach(dataPoint.fields, (field) => {
        // do not serialize fields with null & undefined values
        if (field.value != null) {
          const userSpecifiedType = WriteBuffer.getUserSpecifiedType(schema, field.key);
          outputStream.write(WriteBuffer.escape(field.key));
          outputStream.write('=');
          outputStream.write(WriteBuffer.serializeFieldValue(field.value, userSpecifiedType));
        }
      });
    } else if ((typeof dataPoint.fields) === 'object') {
      _.forOwn(dataPoint.fields, (fieldValue, fieldKey) => {
        // do not serialize fields with null & undefined values
        if (fieldValue != null) {
          const userSpecifiedType = WriteBuffer.getUserSpecifiedType(schema, fieldKey);
          outputStream.write(WriteBuffer.escape(fieldKey));
          outputStream.write('=');
          outputStream.write(WriteBuffer.serializeFieldValue(fieldValue, userSpecifiedType));
        }
      });
    }
  }

  static serializeFieldValue(value, userSpecifiedType) {
    if (userSpecifiedType) {
      switch (userSpecifiedType) {
        case FieldType.STRING:
          return `"${WriteBuffer.escapeStringFieldValue(value)}"`;
        case FieldType.BOOLEAN:
          return value ? 'T' : 'F';
        case FieldType.FLOAT:
          return value.toString();
        case FieldType.INTEGER:
          return `${value}i`;
        default:
      }
    } else {
      switch (typeof value) {
        case 'string':
          return `"${WriteBuffer.escapeStringFieldValue(value)}"`;
        case 'boolean':
          return value ? 'T' : 'F';
        case 'number':
          return value.toString();
        default:
      }
    }
    throw new InfluxDBError(`Unsupported field value type: ${typeof value}`);
  }

  getSchemaRecord(measurement) {
    const schemaRecord = this.schemas[measurement];
    return schemaRecord ? schemaRecord.schema : undefined;
  }

  /*
   * Escape tag keys, tag values, and field keys
   * see https://docs.influxdata.com/influxdb/latest/write_protocols/line_protocol_reference/#special-characters
   */
  static escape(s) {
    return s.replace(/([,= ])/g, '\\$1');
  }

  static escapeMeasurementName(s) {
    return s.replace(/([ ,])/g, '\\$1');
  }

  static escapeStringFieldValue(s) {
    return s.replace(/(["])/g, '\\$1');
  }

  static serializeTimestamp(timestamp) {
    switch (typeof timestamp) {
      case 'string':
        return timestamp;
      case 'object':
        return WriteBuffer.convertMsToNs(timestamp.getTime());
      case 'number':
        return WriteBuffer.convertMsToNs(timestamp);
      case 'undefined':
        return this.autoGenerateTimestamps ? WriteBuffer.convertMsToNs(new Date().getTime()) : '';
      default:
        throw new InfluxDBError('Assertion failed.');
    }
  }

  // convert number/string in unix ms format into nanoseconds as required by InfluxDB
  static convertMsToNs(ms) {
    return `${ms}000000`;
  }
}

export default WriteBuffer;