Home Reference Source Repository

src/impl/ConnectionImpl.js

import * as RequestPromise from 'request-promise';
import * as _ from 'lodash';
/** @namespace JSON */

import InfluxDBError from '~/InfluxDBError';
import WriteBuffer from '~/impl/WriteBuffer';
import ConnectionTracker from '~/impl/ConnectionTracker';
import DefaultConnectionOptions from '~/impl/DefaultConnectionOptions';

/**
 * @ignore
 */
class ConnectionImpl {
  constructor(options) {
    this.schemas = {};
    ConnectionImpl.validateteOptions(options);
    this.options = ConnectionImpl.prepareOptions(options);
    ConnectionImpl.validateSchemas(options.schema);
    this.schemas = ConnectionImpl.prepareSchemas(options.schema);
    // for convenience of the user ignore a slash at the end of the URL
    this.hostUrl = ConnectionImpl.stripTrailingSlashIfNeeded(this.options.hostUrl);
    // buffer for data points, new instance is created for every batch
    this.writeBuffer = new WriteBuffer(this.schemas, this.options.autoGenerateTimestamps);
    // timer handle used to flush the buffer when it becomes too old
    this.bufferFlushTimerHandle = null;
    // unique ID of this connection
    this.connectionId = ConnectionTracker.generateConnectionID();
    // becomes true when connection to InfluxDB gets verified either automatically or by
    // calling connect()
    this.connected = false;
    // becomes false after the user explicitly calls disconnect(); the connection will
    // not be usable without reconnecting
    this.disconnected = false;
  }

  static stripTrailingSlashIfNeeded(url) {
    if (url.endsWith('/')) return url.substring(0, url.length - 1);
    return url;
  }

  static validateSchemas(schemas) {
    _.forEach(schemas, (schema) => {
      if (!schema.measurement) {
        throw new InfluxDBError('Each data point schema must have "measurement" property defined');
      }
    });
  }

  // Copy the supplied schema so that it won't get affected by further modifications from
  // the user. Also convert tags to a map for faster access during serialization
  static prepareSchemas(schemas) {
    if (schemas) {
      const data = _.map(schemas, schema =>
        ({
          measurement: schema.measurement,
          schema: _.cloneDeep(schema),
          tagsDictionary: _.groupBy(schema.tags),
        }));
      return _.keyBy(data, 'measurement');
    }
    return {};
  }

  static validateteOptions(options) {
    if (!options.database) throw new InfluxDBError("'database' option must be specified");
  }

  static prepareOptions(options) {
    const results = {};
    Object.assign(results, DefaultConnectionOptions);
    Object.assign(results, options);
    return results;
  }

  // called by the connection tracker when the node process is exiting
  onProcessExit() {
    if (this.writeBuffer.batchSize > 0) {
      console.error('Warning: there are still buffered data points to be written into InfluxDB, ' +
          'but the process is about to exit. Forgot to call Connection.flush() ?');
    }
    this.writeBuffer.rejectWritePromises(new InfluxDBError('Can\'t write data points to InfluxDB, process is exiting'));
  }

  write(dataPoints, forceFlush) {
    try {
      if (!dataPoints) return this.writeEmptySetOfPoints(forceFlush);
      if (!Array.isArray(dataPoints)) {
        if (typeof dataPoints === 'object') {
          return this.write([dataPoints], forceFlush);
        }
        return Promise.reject(new InfluxDBError('Invalid arguments supplied'));
      }
      if (dataPoints.length === 0) return this.writeEmptySetOfPoints(forceFlush);
      return this.whenConnected(() =>
        this.writeWhenConnectedAndInputValidated(dataPoints, forceFlush));
    } catch (e) {
      return Promise.reject(e);
    }
  }

  writeEmptySetOfPoints(forceFlush) {
    if (forceFlush) {
      return this.flush();
    }
    return Promise.resolve();
  }

  writeWhenConnectedAndInputValidated(dataPoints, forceFlush) {
    const batchSizeLimitNotReached = this.options.batchSize > 0 &&
        (this.writeBuffer.batchSize + dataPoints.length < this.options.batchSize);
    const timeoutLimitNotReached = this.writeBuffer.firstWriteTimestamp === null ||
        (this.options.maximumWriteDelay > 0 &&
        (new Date().getTime() - this.writeBuffer.firstWriteTimestamp <
        this.options.maximumWriteDelay));

    this.writeBuffer.write(dataPoints);

    if (batchSizeLimitNotReached && timeoutLimitNotReached && !forceFlush) {
      // just write into the buffer
      return this.promiseBufferedWrite();
    }
    // write to InfluxDB now, but serialize submitted data points first
    return this.flush();
  }

  promiseBufferedWrite() {
    if (this.writeBuffer.firstWriteTimestamp === null) {
      this.writeBuffer.firstWriteTimestamp = new Date().getTime();
      ConnectionTracker.startTracking(this);
      this.scheduleFlush(() => {
        this.flush();
      }, this.options.maximumWriteDelay);
    }
    if (this.options.autoResolveBufferedWritePromises) {
      return Promise.resolve();
    }
    return this.writeBuffer.createPromiseToResolveOnFlush();
  }

  scheduleFlush(onFlush, delay) {
    if (this.bufferFlushTimerHandle === null) {
      this.bufferFlushTimerHandle = setTimeout(onFlush, delay);
    }
  }

  cancelFlushSchedule() {
    if (this.bufferFlushTimerHandle !== null) {
      clearTimeout(this.bufferFlushTimerHandle);
      this.bufferFlushTimerHandle = null;
    }
  }

  flush() {
    const url = `${this.hostUrl}/write?db=${this.options.database}`;
    const bodyBuffer = this.writeBuffer.stream.getContents();
    const flushedWriteBuffer = this.writeBuffer;
    // prevent sending empty requests to the db
    if (flushedWriteBuffer.batchSize === 0) return Promise.resolve();
    // from now on all writes will be redirected to a new buffer
    this.writeBuffer = new WriteBuffer(this.schemas, this.options.autoGenerateTimestamps);
    // prevent repeated flush call if flush invoked before expiration timeout
    this.cancelFlushSchedule();
    // shutdown hook doesn't need to track this connection any more
    ConnectionTracker.stopTracking(this);

    return new RequestPromise.Request({
      resolveWithFullResponse: true,
      url,
      method: 'POST',
      headers: { 'Content-Type': 'application/text' },
      body: bodyBuffer,
      auth: {
        user: this.options.username,
        pass: this.options.password,
      },
    }).then((result) => {
      if (result.statusCode >= 200 && result.statusCode < 400) {
        flushedWriteBuffer.resolveWritePromises();
        return Promise.resolve();
      }
      let message = `Influx db write failed ${result.statusCode}`;
      // add information returned by the server if possible
      try {
        message += `: ${JSON.parse(result.body).error}`;
      } catch (e) {
        // we append the message only if it can be parsed form the response
      }
      return this.onFlushError(flushedWriteBuffer, message, bodyBuffer.toString());
    }).catch((e) => {
      const message = `Cannot write data to InfluxDB, reason: ${e.message}`;
      return this.onFlushError(flushedWriteBuffer, message, bodyBuffer.toString());
    });
  }

  onFlushError(flushedWriteBuffer, message, data) {
    const error = new InfluxDBError(message);
    if (this.options.autoResolveBufferedWritePromises) {
      this.options.batchWriteErrorHandler(error, data);
    } else {
      flushedWriteBuffer.rejectWritePromises(error);
    }
    return Promise.reject(error);
  }

  executeQuery(query, database) {
    return this.executeRawQuery(query, database).then(ConnectionImpl.postProcessQueryResults);
  }

  executeRawQuery(query, database) {
    return this.whenConnected(() => this.executeInternalQuery(query, database));
  }

  executeInternalQuery(query, database) {
    const db = !database ? this.options.database : database;
    const url = `${this.hostUrl}/query?db=${encodeURIComponent(db)}&q=${encodeURIComponent(query)}`;
    return new RequestPromise.Request({
      resolveWithFullResponse: true,
      url,
      auth: {
        user: this.options.username,
        pass: this.options.password,
      },
    }).then((result) => {
      if (result.statusCode >= 200 && result.statusCode < 400) {
        const contentType = result.headers['content-type'];
        if (contentType === 'application/json') {
          const data = JSON.parse(result.body);
          if (data.results[0].error) {
            return Promise.reject(new InfluxDBError(data.results[0].error));
          }
          return Promise.resolve(data);
        }
        return Promise.reject(new InfluxDBError(`Unexpected result content-type: ${contentType}`));
      }
      const error = new InfluxDBError(`HTTP ${result.statusCode} communication error`);
      return Promise.reject(error);
    }).catch(
      e => Promise.reject(new InfluxDBError(`Cannot read data from InfluxDB, reason: ${e.message}`)));
  }

  static postProcessQueryResults(results) {
    const outcome = [];
    _.forEach(results.results, (queryResult) => {
      _.forEach(queryResult.series, (series) => {
        // use for loops form now on to get better performance
        for (const values of series.values) {
          const result = {};
          let i = 0;
          for (const columnName of series.columns) {
            if (columnName === 'time') {
              try {
                result[columnName] = new Date(values[i]);
              } catch (e) {
                result[columnName] = values[i];
              }
            } else {
              result[columnName] = values[i];
            }
            i += 1;
          }
          if (series.tags) Object.assign(result, series.tags);
          outcome.push(result);
        }
      });
    });
    return Promise.resolve(outcome);
  }

  connect() {
    if (this.options.autoCreateDatabase) {
      // This works because create database operation is idempotent; unfortunately,
      // SHOW DATABASES requires the same admin permissions as CREATE DATABASE. Therefore
      // there is no point in trying to list the databases first and checking for the one
      // the user is trying to use.
      return this.executeInternalQuery(`CREATE DATABASE ${this.options.database}`).then(() => {
        this.connected = true;
        this.disconnected = false;
      });
    }
    const result = this.executeInternalQuery('SHOW DATABASES').then((databases) => {
      this.connected = this.doesDatabaseExists(databases);
      this.disconnected = !this.connected;
      if (!this.connected) {
        return result.reject(new InfluxDBError(`Database '${this.options.database}' does not exist`));
      }
      return Promise.resolve();
    }).catch(() => {
      // When user authentication is in use, SHOW DATABASES will fail due to insufficient user
      // privileges Therefore we try if the server is alive at least...
      const url = `${this.hostUrl}/ping`;
      return new RequestPromise.Request({ uri: url }).then(() => {
        this.connected = true;
        this.disconnected = false;
      }).catch(e => Promise.reject(
        new InfluxDBError(`Unable to contact InfluxDB, ping operation on '${url}' failed, reason: ${e.message}`)),
      );
    });
    return result;
  }

  disconnect() {
    const result = this.flush();
    this.connected = false;
    this.disconnected = true;
    return result;
  }

  doesDatabaseExists(showDatabasesResult) {
    // there is always _internal database available/visible
    // noinspection JSUnresolvedVariable - we don't want to document InfluxDB result format here
    const values = showDatabasesResult.results[0].series[0].values;
    // this is a lodash problem; it doesn't declare string parameter but it is documented
    // noinspection JSCheckFunctionSignatures
    return _.findIndex(values, this.options.database) >= 0;
  }

  whenConnected(action) {
    try {
      if (this.disconnected) {
        return Promise.reject(new InfluxDBError('Attempt to use a disconnected connection detected'));
      }
      if (!this.connected) {
        return this.connect().then(action);
      }
      return action();
    } catch (e) {
      return Promise.reject(e);
    }
  }
}


export default ConnectionImpl;