Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I implement a database connection/select/close using RxJS observables

I'm using node-oracledb to connect to an Oracle database. The API provides its own promises, that can be casted to Promise<T> and therefore "converted" to Observable<T>.

Using Observables, I would like to:

  1. Open the database connection
  2. Select N records
  3. Close the database connection, even if #2 threw an exception.

Using the traditional, blocking, procedural way, it would be something like this:

try
{
    connection = Oracle.getConnection(...);
    resultSet = connection.execute("SELECT ... FROM ...");
}
catch (Exception)
{
    resultSet = EMPTY_RESULT;
}
finally
{
    if (connection)
        connection.close();
}

My attempts at writing this using Observables lead to a lot of code and callbacks.

The protected method getConnection() is still pretty simple:

import * as Oracle from "oracledb";

protected getConnection() : Observable<IConnection>
{
    return OraUtil.from(Oracle.getConnection(this.parameters));
}

And so is the closeConnection() method. I used the promise directly here, to avoid even more code.

protected closeConnection(subscriber : Subscriber<IExecuteReturn>, connection : IConnection) : void
{
    connection.close()
        .then(() => subscriber.complete())
        .catch((error) => subscriber.error());
}

But the execute() method is where the trouble starts.

protected _execute(connection : IConnection, statement : string) : Observable<IExecuteReturn>
{
    return new Observable<IExecuteReturn>(
        (subscriber) => {
            OraUtil.from(connection.execute(statement)).subscribe(
                (result) => subscriber.next(result),
                (error) => {
                    subscriber.error(error);
                    this.closeConnection(subscriber, connection);
                },
                () => {
                    this.closeConnection(subscriber, connection);
                });
        });
}

public execute(statement : string) : Observable<IExecuteReturn>
{
    return this.getConnection().pipe(
        flatMap((connection) => this._execute(connection, statement))
    );
}
like image 569
Eti Avatar asked Oct 24 '25 02:10

Eti


1 Answers

This is how I generally handle connection management. The core is using observable creator that accepts resource factory as first argument and setup function as second.

using(() => { unsubscribe() }, resource => observableOf(resource))

The resource is an object with unsubscribe method that gets called as part of unsubscription - so you can hide any logic there and effectively bind lifecycle of arbitrary object to lifecycle of an observable.

I hope the code below makes sense.

import * as Oracle from "oracledb";
import { mergeMap , ignoreElements} from 'rxjs/operators';
import { using } from 'rxjs/observable/using';
import { from as observableFrom } from 'rxjs/observable/from';
import { concat } from 'rxjs/observable/concat';
import { defer } from 'rxjs/observable/defer';
import { empty as observableEmpty } from 'rxjs/observable/empty';

class OracleConnection {
  constructor(parameters) {
    this.isClosed = false;
    this.connection = null;
    this.parameters = parameters;
  }

  setup() {
    return defer(() => Oracle.getConnection(this.parameters)
      .then(connection => { // do this in promise in case observable gets unsubscribed before connection is established
        this.connection = connection;
        if (this.isClosed) { // close in case connection got already closed before even got established
          this.terminate();
        }
        return connection;
      }));
  }

  close() {
    this.isClosed = true;
    if (this.connection !== null) {
      const connection = this.connection;
      this.connection = null;

      return observableFrom(connection.close())
        .pipe(ignoreElements()) // only propagate errors
    }

    return observableEmpty(); // connection already closed
  }
  
  terminate() {
    this.close().subscribe(/* handle error from connection close */);
  }

  unsubscribe() { // this will get called on observable unsubscribe
    if (!this.isClosed) {
      this.terminate();
    }
  }
}

class ConnectionManager {
  constructor(params) {
    this.params = params;
  }

  getConnection() {
    return using(() => new OracleConnection(this.params), oracleConnection => oracleConnection.setup())
  }
}

const manager = new ConnectionManager({ /* some params */ });

manager.getConnection()
  .pipe(
    mergeMap(connection => concat(
      connection.execute('SELECT 1'),
      connection.close() // explicitly close connection
    )),
    // alternatively
    // take(1) // to close connection automatically
  );

Cool thing you can do for example is easily retry the connection in case of failure:

oracle.getConnection()
  .pipe(
    retry(3)
    ...
  );
like image 75
m1ch4ls Avatar answered Oct 25 '25 15:10

m1ch4ls