Build a Python App with CockroachDB and psycopg2

Tip:

Take our newest Cockroach University course, CockroachDB for Python Developers.

This tutorial shows you how build a simple Python application with CockroachDB and the psycopg2 driver. For the CockroachDB back-end, you'll use a temporary local cluster.

Step 1. Install the psycopg2 driver

To install the Python psycopg2 driver, run the following command:

icon/buttons/copy
$ pip install psycopg2-binary

For other ways to install psycopg2, see the official documentation.

Step 2. Start CockroachDB

  1. If you haven't already, download the CockroachDB binary.
  2. Run the cockroach demo command:

    icon/buttons/copy
    $ cockroach demo \
    --empty
    

    This starts a temporary, in-memory cluster and opens an interactive SQL shell to the cluster.

  3. Take note of the (sql/tcp) connection string in the SQL shell welcome text:

    # Connection parameters:
    #   (console) http://127.0.0.1:61009
    #   (sql)     postgres://root:admin@?host=%2Fvar%2Ffolders%2Fk1%2Fr048yqpd7_9337rgxm9vb_gw0000gn%2FT%2Fdemo255013852&port=26257
    #   (sql/tcp) postgres://root:admin@127.0.0.1:61011?sslmode=require    
    

    You will use it in your application code later.

Step 3. Create a database

  1. In the SQL shell, create the bank database that your application will use:

    icon/buttons/copy
    > CREATE DATABASE bank;
    
  2. Create a SQL user for your app:

    icon/buttons/copy
    > CREATE USER <username> WITH PASSWORD <password>;
    

    Take note of the username and password. You will use it in your application code later.

  3. Give the user the necessary permissions:

    icon/buttons/copy
    > GRANT ALL ON DATABASE bank TO <username>;
    

Step 4. Run the Python code

Now that you have a database, you'll run the code shown below to:

  • Create an accounts table and insert some rows.
  • Transfer funds between two accounts inside a transaction.
  • Delete the accounts from the table before exiting so you can re-run the example code.

To handle transaction retry errors, the code uses an application-level retry loop that, in case of error, sleeps before trying the funds transfer again. If it encounters another retry error, it sleeps for a longer interval, implementing exponential backoff.

Get the code

Download the example.py file, or create the file yourself and copy the code into it.

If you prefer, you can also clone a version of the code:

icon/buttons/copy
$ git clone https://github.com/cockroachlabs/hello-world-python-psycopg2/
icon/buttons/copy
#!/usr/bin/env python3
"""
Test psycopg with CockroachDB.
"""

import time
import random
import logging
from argparse import ArgumentParser, RawTextHelpFormatter

import psycopg2
from psycopg2.errors import SerializationFailure


def create_accounts(conn):
    with conn.cursor() as cur:
        cur.execute(
            "CREATE TABLE IF NOT EXISTS accounts (id INT PRIMARY KEY, balance INT)"
        )
        cur.execute("UPSERT INTO accounts (id, balance) VALUES (1, 1000), (2, 250)")
        logging.debug("create_accounts(): status message: %s", cur.statusmessage)
    conn.commit()


def delete_accounts(conn):
    with conn.cursor() as cur:
        cur.execute("DELETE FROM bank.accounts")
        logging.debug("delete_accounts(): status message: %s", cur.statusmessage)
    conn.commit()


def print_balances(conn):
    with conn.cursor() as cur:
        cur.execute("SELECT id, balance FROM accounts")
        logging.debug("print_balances(): status message: %s", cur.statusmessage)
        rows = cur.fetchall()
        conn.commit()
        print(f"Balances at {time.asctime()}:")
        for row in rows:
            print(row)


def transfer_funds(conn, frm, to, amount):
    with conn.cursor() as cur:

        # Check the current balance.
        cur.execute("SELECT balance FROM accounts WHERE id = %s", (frm,))
        from_balance = cur.fetchone()[0]
        if from_balance < amount:
            raise RuntimeError(
                f"Insufficient funds in {frm}: have {from_balance}, need {amount}"
            )

        # Perform the transfer.
        cur.execute(
            "UPDATE accounts SET balance = balance - %s WHERE id = %s", (amount, frm)
        )
        cur.execute(
            "UPDATE accounts SET balance = balance + %s WHERE id = %s", (amount, to)
        )

    conn.commit()
    logging.debug("transfer_funds(): status message: %s", cur.statusmessage)


def run_transaction(conn, op, max_retries=3):
    """
    Execute the operation *op(conn)* retrying serialization failure.

    If the database returns an error asking to retry the transaction, retry it
    *max_retries* times before giving up (and propagate it).
    """
    # leaving this block the transaction will commit or rollback
    # (if leaving with an exception)
    with conn:
        for retry in range(1, max_retries + 1):
            try:
                op(conn)

                # If we reach this point, we were able to commit, so we break
                # from the retry loop.
                return

            except SerializationFailure as e:
                # This is a retry error, so we roll back the current
                # transaction and sleep for a bit before retrying. The
                # sleep time increases for each failed transaction.
                logging.debug("got error: %s", e)
                conn.rollback()
                logging.debug("EXECUTE SERIALIZATION_FAILURE BRANCH")
                sleep_ms = (2 ** retry) * 0.1 * (random.random() + 0.5)
                logging.debug("Sleeping %s seconds", sleep_ms)
                time.sleep(sleep_ms)

            except psycopg2.Error as e:
                logging.debug("got error: %s", e)
                logging.debug("EXECUTE NON-SERIALIZATION_FAILURE BRANCH")
                raise e

        raise ValueError(f"Transaction did not succeed after {max_retries} retries")


def test_retry_loop(conn):
    """
    Cause a seralization error in the connection.

    This function can be used to test retry logic.
    """
    with conn.cursor() as cur:
        # The first statement in a transaction can be retried transparently on
        # the server, so we need to add a dummy statement so that our
        # force_retry() statement isn't the first one.
        cur.execute("SELECT now()")
        cur.execute("SELECT crdb_internal.force_retry('1s'::INTERVAL)")
    logging.debug("test_retry_loop(): status message: %s", cur.statusmessage)


def main():
    opt = parse_cmdline()
    logging.basicConfig(level=logging.DEBUG if opt.verbose else logging.INFO)

    conn = psycopg2.connect(opt.dsn)
    create_accounts(conn)
    print_balances(conn)

    amount = 100
    fromId = 1
    toId = 2

    try:
        run_transaction(conn, lambda conn: transfer_funds(conn, fromId, toId, amount))

        # The function below is used to test the transaction retry logic.  It
        # can be deleted from production code.
        # run_transaction(conn, test_retry_loop)
    except ValueError as ve:
        # Below, we print the error and continue on so this example is easy to
        # run (and run, and run...).  In real code you should handle this error
        # and any others thrown by the database interaction.
        logging.debug("run_transaction(conn, op) failed: %s", ve)
        pass

    print_balances(conn)

    delete_accounts(conn)

    # Close communication with the database.
    conn.close()


def parse_cmdline():
    parser = ArgumentParser(description=__doc__,
                            formatter_class=RawTextHelpFormatter)
    parser.add_argument(
        "dsn",
        help="""\
database connection string

For cockroach demo, use
'postgresql://<username>:<password>@<hostname>:<port>/bank?sslmode=require',
with the username and password created in the demo cluster, and the hostname
and port listed in the (sql/tcp) connection parameters of the demo cluster
welcome message.

For CockroachCloud Free, use
'postgres://<username>:<password>@free-tier.gcp-us-central1.cockroachlabs.cloud:26257/<cluster-name>.bank?sslmode=verify-full&sslrootcert=<your_certs_directory>/cc-ca.crt'.

If you are using the connection string copied from the Console, your username,
password, and cluster name will be pre-populated. Replace
<your_certs_directory> with the path to the 'cc-ca.crt' downloaded from the
Console.

"""
    )

    parser.add_argument("-v", "--verbose",
                        action="store_true", help="print debug info")

    opt = parser.parse_args()
    return opt


if __name__ == "__main__":
    main()

Run the code

The Python code is a command-line utility that accepts the connection string to CockroachDB as a command-line argument. Before running the code, update the connection string as follows:

  • Replace <username> and <password> with the SQL username and password that you created earlier.
  • Replace <hostname> and <port> with the hostname and port in the (sql/tcp) connection string from SQL shell welcome text.
icon/buttons/copy
$ python3 example.py \
'postgresql://<username>:<password>@<hostname>:<port>/bank?sslmode=require'

The output should show the account balances before and after the funds transfer:

Balances at Fri Oct 30 18:27:00 2020:
(1, 1000)
(2, 250)
Balances at Fri Oct 30 18:27:00 2020:
(1, 900)
(2, 350)

What's next?

Read more about using the Python psycopg2 driver.

You might also be interested in the following pages:

YesYes NoNo