Take our newest Cockroach University course, CockroachDB for Python Developers.
This tutorial shows you how build a simple Python application with CockroachDB and the psycopg2 driver. For the CockroachDB back-end, you'll use a temporary local cluster.
Step 1. Install the psycopg2 driver
To install the Python psycopg2 driver, run the following command:
$ pip install psycopg2-binary
For other ways to install psycopg2, see the official documentation.
Step 2. Start CockroachDB
- If you haven't already, download the CockroachDB binary.
Run the
cockroach demo
command:$ cockroach demo \ --empty
This starts a temporary, in-memory cluster and opens an interactive SQL shell to the cluster.
Take note of the
(sql/tcp)
connection string in the SQL shell welcome text:# Connection parameters: # (console) http://127.0.0.1:61009 # (sql) postgres://root:admin@?host=%2Fvar%2Ffolders%2Fk1%2Fr048yqpd7_9337rgxm9vb_gw0000gn%2FT%2Fdemo255013852&port=26257 # (sql/tcp) postgres://root:admin@127.0.0.1:61011?sslmode=require
You will use it in your application code later.
Step 3. Create a database
In the SQL shell, create the
bank
database that your application will use:> CREATE DATABASE bank;
Create a SQL user for your app:
> CREATE USER <username> WITH PASSWORD <password>;
Take note of the username and password. You will use it in your application code later.
Give the user the necessary permissions:
> GRANT ALL ON DATABASE bank TO <username>;
Step 4. Run the Python code
Now that you have a database, you'll run the code shown below to:
- Create an accounts table and insert some rows.
- Transfer funds between two accounts inside a transaction.
- Delete the accounts from the table before exiting so you can re-run the example code.
To handle transaction retry errors, the code uses an application-level retry loop that, in case of error, sleeps before trying the funds transfer again. If it encounters another retry error, it sleeps for a longer interval, implementing exponential backoff.
Get the code
Download the example.py
file, or create the file yourself and copy the code into it.
If you prefer, you can also clone a version of the code:
$ git clone https://github.com/cockroachlabs/hello-world-python-psycopg2/
#!/usr/bin/env python3
"""
Test psycopg with CockroachDB.
"""
import time
import random
import logging
from argparse import ArgumentParser, RawTextHelpFormatter
import psycopg2
from psycopg2.errors import SerializationFailure
def create_accounts(conn):
with conn.cursor() as cur:
cur.execute(
"CREATE TABLE IF NOT EXISTS accounts (id INT PRIMARY KEY, balance INT)"
)
cur.execute("UPSERT INTO accounts (id, balance) VALUES (1, 1000), (2, 250)")
logging.debug("create_accounts(): status message: %s", cur.statusmessage)
conn.commit()
def delete_accounts(conn):
with conn.cursor() as cur:
cur.execute("DELETE FROM bank.accounts")
logging.debug("delete_accounts(): status message: %s", cur.statusmessage)
conn.commit()
def print_balances(conn):
with conn.cursor() as cur:
cur.execute("SELECT id, balance FROM accounts")
logging.debug("print_balances(): status message: %s", cur.statusmessage)
rows = cur.fetchall()
conn.commit()
print(f"Balances at {time.asctime()}:")
for row in rows:
print(row)
def transfer_funds(conn, frm, to, amount):
with conn.cursor() as cur:
# Check the current balance.
cur.execute("SELECT balance FROM accounts WHERE id = %s", (frm,))
from_balance = cur.fetchone()[0]
if from_balance < amount:
raise RuntimeError(
f"Insufficient funds in {frm}: have {from_balance}, need {amount}"
)
# Perform the transfer.
cur.execute(
"UPDATE accounts SET balance = balance - %s WHERE id = %s", (amount, frm)
)
cur.execute(
"UPDATE accounts SET balance = balance + %s WHERE id = %s", (amount, to)
)
conn.commit()
logging.debug("transfer_funds(): status message: %s", cur.statusmessage)
def run_transaction(conn, op, max_retries=3):
"""
Execute the operation *op(conn)* retrying serialization failure.
If the database returns an error asking to retry the transaction, retry it
*max_retries* times before giving up (and propagate it).
"""
# leaving this block the transaction will commit or rollback
# (if leaving with an exception)
with conn:
for retry in range(1, max_retries + 1):
try:
op(conn)
# If we reach this point, we were able to commit, so we break
# from the retry loop.
return
except SerializationFailure as e:
# This is a retry error, so we roll back the current
# transaction and sleep for a bit before retrying. The
# sleep time increases for each failed transaction.
logging.debug("got error: %s", e)
conn.rollback()
logging.debug("EXECUTE SERIALIZATION_FAILURE BRANCH")
sleep_ms = (2 ** retry) * 0.1 * (random.random() + 0.5)
logging.debug("Sleeping %s seconds", sleep_ms)
time.sleep(sleep_ms)
except psycopg2.Error as e:
logging.debug("got error: %s", e)
logging.debug("EXECUTE NON-SERIALIZATION_FAILURE BRANCH")
raise e
raise ValueError(f"Transaction did not succeed after {max_retries} retries")
def test_retry_loop(conn):
"""
Cause a seralization error in the connection.
This function can be used to test retry logic.
"""
with conn.cursor() as cur:
# The first statement in a transaction can be retried transparently on
# the server, so we need to add a dummy statement so that our
# force_retry() statement isn't the first one.
cur.execute("SELECT now()")
cur.execute("SELECT crdb_internal.force_retry('1s'::INTERVAL)")
logging.debug("test_retry_loop(): status message: %s", cur.statusmessage)
def main():
opt = parse_cmdline()
logging.basicConfig(level=logging.DEBUG if opt.verbose else logging.INFO)
conn = psycopg2.connect(opt.dsn)
create_accounts(conn)
print_balances(conn)
amount = 100
fromId = 1
toId = 2
try:
run_transaction(conn, lambda conn: transfer_funds(conn, fromId, toId, amount))
# The function below is used to test the transaction retry logic. It
# can be deleted from production code.
# run_transaction(conn, test_retry_loop)
except ValueError as ve:
# Below, we print the error and continue on so this example is easy to
# run (and run, and run...). In real code you should handle this error
# and any others thrown by the database interaction.
logging.debug("run_transaction(conn, op) failed: %s", ve)
pass
print_balances(conn)
delete_accounts(conn)
# Close communication with the database.
conn.close()
def parse_cmdline():
parser = ArgumentParser(description=__doc__,
formatter_class=RawTextHelpFormatter)
parser.add_argument(
"dsn",
help="""\
database connection string
For cockroach demo, use
'postgresql://<username>:<password>@<hostname>:<port>/bank?sslmode=require',
with the username and password created in the demo cluster, and the hostname
and port listed in the (sql/tcp) connection parameters of the demo cluster
welcome message.
For CockroachCloud Free, use
'postgres://<username>:<password>@free-tier.gcp-us-central1.cockroachlabs.cloud:26257/<cluster-name>.bank?sslmode=verify-full&sslrootcert=<your_certs_directory>/cc-ca.crt'.
If you are using the connection string copied from the Console, your username,
password, and cluster name will be pre-populated. Replace
<your_certs_directory> with the path to the 'cc-ca.crt' downloaded from the
Console.
"""
)
parser.add_argument("-v", "--verbose",
action="store_true", help="print debug info")
opt = parser.parse_args()
return opt
if __name__ == "__main__":
main()
Run the code
The Python code is a command-line utility that accepts the connection string to CockroachDB as a command-line argument. Before running the code, update the connection string as follows:
- Replace
<username>
and<password>
with the SQL username and password that you created earlier. - Replace
<hostname>
and<port>
with the hostname and port in the(sql/tcp)
connection string from SQL shell welcome text.
$ python3 example.py \
'postgresql://<username>:<password>@<hostname>:<port>/bank?sslmode=require'
The output should show the account balances before and after the funds transfer:
Balances at Fri Oct 30 18:27:00 2020:
(1, 1000)
(2, 250)
Balances at Fri Oct 30 18:27:00 2020:
(1, 900)
(2, 350)
What's next?
Read more about using the Python psycopg2 driver.
You might also be interested in the following pages: