Use RisingWave in your Python application
RisingWave provides a Python SDK risingwave-py
(currently under public preview) to help user develops event-driven applications.
As RisingWave is wire-compatible with PostgreSQL, you can also use third-party PostgreSQL drivers like psycopg2
and sqlalchemy
to interact with RisingWave from your Python applications.
Use risingwave-py
to connect to RisingWave
risingwave-py is a RisingWave Python SDK that provides the following capabilities:
- Interact with RisingWave via Pandas DataFrame.
- Subscribe and process changes from RisingWave tables or materialized views.
- Run SQL commands supported in RisingWave.
Run RisingWave
To learn about how to run RisingWave, see Run RisingWave.
Connect to RisingWave
To connect to RisingWave via risingwave-py
:
from risingwave import RisingWave, RisingWaveConnOptions
# Connect to RisingWave instance on localhost with named parameters
rw = RisingWave(
RisingWaveConnOptions.from_connection_info(
host="localhost", port=4566, user="root", password="root", database="dev"
)
)
# Connect to RisingWave instance on localhost with connection string
rw = RisingWave(RisingWaveConnOptions("postgresql://root:root@localhost:4566/dev"))
# You can create a new SQL connection and execute operations under the with statement.
# This is the recommended way for python sdk usage.
with rw.getconn() as conn:
conn.insert(...)
conn.fetch(...)
conn.execute(...)
conn.mv(...)
conn.on_change(...)
# You can also use the existing connection created by RisingWave object to execute operations.
# This will be used in the later sections for simplicity.
rw.insert(...)
rw.fetch(...)
rw.execute(...)
rw.mv(...)
rw.on_change(...)
Ingestion into RisingWave
Load a Pandas DataFrame into RisingWave
from datetime import datetime
import pandas as pd
df = pd.DataFrame(
{
"product": ["foo", "bar"],
"price": [123.4, 456.7],
"ts": [datetime.strptime("2023-10-05 14:30:00", "%Y-%m-%d %H:%M:%S"),
datetime.strptime("2023-10-05 14:31:20", "%Y-%m-%d %H:%M:%S")],
}
)
# A test table will be created if not exist in risingwave with the correct schema
rw.insert(table_name="test", data=df)
# You can provide an optional force_flush parameter and set it to True
# if you would the inserted data to be visible in fetch query immediately.
# Otherwise, data will be inserted in batches asynchronusly for better performance.
# rw.insert(table_name="test", data=df, force_flush = True)
Load data into RisingWave from external systems
# Create a table and load data from upstream kafka
rw.execute("""
CREATE TABLE IF NOT EXISTS source_abc
WITH (
connector='kafka',
properties.bootstrap.server='localhost:9092',
topic='test_topic'
)
FORMAT UPSERT ENCODE AVRO (
schema.registry = 'http://127.0.0.1:8081',
schema.registry.username='your_schema_registry_username',
schema.registry.password='your_schema_registry_password'
)""")
For supported sources and the SQL syntax, see this topic.
Query from RisingWave
from risingwave import OutputFormat
result: pd.DataFrame = rw.fetch("""
SELECT window_start, window_end, product, ROUND(avg(price)) as avg_price
FROM tumble(test, ts, interval '10 seconds')
GROUP BY window_start, window_end, product""",
format=OutputFormat.DATAFRAME)
print(result)
# Output:
# window_start window_end product avg_price
# 0 2023-10-05 14:31:20 2023-10-05 14:31:30 bar 457.0
# 1 2023-10-05 14:30:00 2023-10-05 14:30:10 foo 123.0
# You can also use OutputFormat.RAW to get back list of tuples as the query results
# rw.fetch("...", format=OutputFormat.RAW)
# [(datetime.datetime(2023, 10, 5, 14, 31, 20), datetime.datetime(2023, 10, 5, 14, 31, 30), 'bar', 457.0),
# (datetime.datetime(2023, 10, 5, 14, 30), datetime.datetime(2023, 10, 5, 14, 30, 10), 'foo', 123.0)]
Event-driven processing with RisingWave
Event-driven applications depend on real-time data processing to react to events as they occur. With risingwave-py
, you can define materialized views using SQL and run them in RisingWave. Behind the scenes, events are processed continuously, and the results are incrementally maintained.
Define the processing logic in SQL
In the following example, test_mv
is created to incrementally maintain the result of the defined SQL as events are ingested in to the test
table.
mv = rw.mv(name="test_mv",
stmt="""SELECT window_start, window_end, product, ROUND(avg(price)) as avg_price
FROM tumble(test, ts, interval '10 seconds')
GROUP BY window_start, window_end, product""")
Subscribe changes from table / materialized view with your own handler
In addition to using SQL to do ad-hoc query on tables and materialized views. With risingwave-py
, You can also subscribe changes from table / materialized view and define handler of the change events from table / materialized view for you applications.
# Write your own handler
# the event will contains all fields of the subscribed MV/Table plus two additional columns:
# - op: varchar. The change operations. Valid values: [Insert, UpdateInsert, Delete, UpdateDelete].
# The reason why we have UpdateInsert and UpdateDelete is because RisingWave treats an UPDATE
# as a delete of the old value followed by an insert of the new value.
# - rw_timestamp: bigint. The Unix timestamp in milliseconds when the data was written.
def simple_event_handler(event: pd.DataFrame):
for _, row in event.iterrows():
# Trigger an action (e.g. place an order via REST API) when the avg_price exceeds 300
if (row["op"] == "UpdateInsert" or row["op"] == "Insert") and row["avg_price"] >= 300:
print(
f"{row['window_start']} - {row['window_end']}: {row['product']} avg price {row['avg_price']} exceeds 300")
# ...
import threading
# Subscribe changes of a materialized view and feed it to your own handler.
# Run on_change in a separate thread without blocking the main thread.
threading.Thread(
target=lambda: mv.on_change(
# Pass your handler here
handler = simple_event_handler,
# Support DATAFRAME and RAW tuples here
output_format=OutputFormat.DATAFRAME,
# If set to True, progress of the subscription will be saved and can be recovered on python application crashed
persist_progress=True,
# Maximal number of rows returned each time
max_batch_size = 10)
).start()
# Subscribe changes of a table and print them to console.
# Run on_change in a separate thread without blocking the main thread.
threading.Thread(
target=lambda: rw.on_change(
subscribe_from="test",
handler = lambda data: print(data),
output_format=OutputFormat.RAW,
persist_progress=False,
max_batch_size = 5)
).start()
# Future inserted data into the base table will be reflected in the subscriptions
# df = pd.DataFrame(
# {
# "product": ["foo", "bar"],
# "price": [1000.2, 5000.4],
# "ts": [datetime.strptime("2023-10-05 17:30:00", "%Y-%m-%d %H:%M:%S"),
# datetime.strptime("2023-10-05 17:31:20", "%Y-%m-%d %H:%M:%S")],
# }
# )
# rw.insert(table_name="test", data=df)
For more details, please refer to the risingwave-py
GitHub repo.
Use psycopg2
to connect to RisingWave
In this section, we use the psycopg2
driver to connect to RisingWave.
Run RisingWave
To learn about how to run RisingWave, see Run RisingWave.
Install the psgcopg2
driver
For information about how to install psycopg
and the difference between psycopg
and psycopg-binary
, see the official psycopg documentation.
Connect to RisingWave
To connect to RisingWave via psycopg2
:
import psycopg2
conn = psycopg2.connect(host="127.0.0.1", port=4566, user="root", dbname="dev")
Create a source
The code below creates a source walk
with the datagen
connector. The datagen
connector is used to generate mock data. The walk
source consists of two columns, distance
and duration
, which respectively represent the distance and the duration of a walk. The source is a simplified version of the data that is tracked by smart watches.
import psycopg2
conn = psycopg2.connect(host="localhost", port=4566, user="root", dbname="dev") # Connect to RisingWave.
conn.autocommit = True # Set queries to be automatically committed.
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE walk(distance INT, duration INT)
WITH (
connector = 'datagen',
fields.distance.kind = 'sequence',
fields.distance.start = '1',
fields.distance.end = '60',
fields.duration.kind = 'sequence',
fields.duration.start = '1',
fields.duration.end = '30',
datagen.rows.per.second='15',
datagen.split.num = '1'
) FORMAT PLAIN ENCODE JSON""") # Execute the query.
conn.close() # Close the connection.
All the code examples in this guide include a section for connecting to RisingWave. If you perform multiple actions within one connection session, you do not need to repeat this section.
Create a materialized view
The code in this section creates a materialized view counter
to capture the latest total distance and duration.
import psycopg2
conn = psycopg2.connect(host="localhost", port=4566, user="root", dbname="dev")
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("""CREATE MATERIALIZED VIEW counter
AS SELECT
SUM(distance) as total_distance,
SUM(duration) as total_duration
FROM walk;""")
conn.close()
Query a materialized view
The code in this section queries the materialized view counter
to get real-time data.
import psycopg2
conn = psycopg2.connect(host="localhost", port=4566, user="root", dbname="dev")
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("SELECT * FROM counter;")
print(cur.fetchall())
conn.close()
Use sqlalchemy
to connect to RisingWave
In this section, we use the SQLAlchemy driver to connect to RisingWave.
Run RisingWave
To learn about how to run RisingWave, see Run RisingWave.
Install necessary Python packages
Ensure you have Python3 installed.
For more information about sqlalchemy
, see the SQLAlchemy. Refer to the documentation version that corresponds to the version of SQLAlchemy that you run.
For information about how to install psycopg-binary
, see the official psycopg documentation.
pip3 install SQLAlchemy sqlalchemy-risingwave psycopg2-binary
Connect to RisingWave
To connect to RisingWave via sqlalchemy
:
DB_URI = 'risingwave+psycopg2://root@risingwave-standalone:4566/dev'
engine = create_engine(DB_URI)
Note that RisingWave does not provide direct compatibility with sqlaclehmy-postgres
so risingwave+psycopg2
is used as the URI scheme. The rest of the URL follows the same format as the PostgreSQL driver.
Create a source
The code below creates a table users
using the engine created in the previous section.
with engine.connect() as conn:
conn.execute("""CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name VARCHAR,
age INTEGER)""")
You can create materialized views and query from materialized views using the same format shown above.