Applications are more complex than ever before.
And it’s not only about the increasing number of users that must be handled or reducing response times.
If your application has a database, probably you’ll need its data in other places as soon as it goes in.
In this context, change data capture is the approach you use to capture and deliver the changes in the database to other sources.
In this tutorial, you’re going to learn how to stream, in realtime, the changes made to a table in a MySQL database to a React app. Something like this:
Here’s what you need to have installed to follow this tutorial:
You’ll need to have knowledge of:
If you want to track and determine if something in a database has changed, you have three main approaches:
I’m going to use the third approach because I think is the most robust. It doesn’t waste resources (like polling) or hurt performance (like triggers).
We’ll read the database changes from the MySQL replication log using the library mysql-binlog-connector-java. Then, we’ll parse the event to extract the relevant data and publish it to a Pusher channel so it can be consumed by a React application.
Here’s the diagram that describes the above process:
For reference, here is a GitHub repository with all the code shown in this tutorial and instructions to run it.
Let’s start by creating a Pusher application.
To get started with Pusher, create your Pusher account or sign in. Then, go to your dashboard and create a Channels app, choosing a name, the cluster closest to your location, and optionally, React as the frontend tech and Java as the backend tech:
This will give you some sample code to get started:
Save your app ID, key, secret and cluster values. We’ll need them later.
The first thing you need to do is enable replication in MySQL.
Replication allows data from one MySQL server (the master) to be copied in an asynchronous way to one or more different MySQL servers (the slaves).
It works by writing all the changes in the master to a binary log file that then is synchronized between master and slaves, so these can apply all those changes.
For this tutorial, you don’t have to set up slave servers. We’re only interested in the binary log.
In the MySQL configuration file (usually at /etc/my.cnf
or C:\ProgramData\MySQL\MySQL Server 5.7\my.ini
), add the following lines:
1[mysqld] 2 server-id = 1 #1 3 log_bin = /var/log/mysql/mysql-bin.log #2 4 expire_logs_days = 10 #3 5 max_binlog_size = 100M #4 6 binlog-format = row #5
Line #1 assigns an identifier to the server.
Line #2 specifies the directory where the logs will be stored. In Windows, it will be something like c:/logs/mysql-bin.log
. In Linux, make sure this directory has the necessary permissions for MySQL.
Line #3 and #4 are optional, they specify the expiration time and maximum size of the file.
Line #5 is important, it specifies the format in which the log will be written.
There are two main types of replication formats:
For our purposes, RBR will be easier to work with. That’s why the file specifies this format.
Now restart the server.
In a terminal window, connect to the MySQL server using mysql
:
mysql -u <YOUR_USER> -p
Now choose or create a database and create the table that is going to be used by the application:
1USE myDatabase 2 CREATE TABLE products(id int(11) not null auto_increment, name varchar(50) default null, price decimal(6,2), primary key (id));
It’s not recommended to work with a user with administrative privileges like root
so let’s create another user for the application:
CREATE USER '<YOUR_USER>'@'<YOUR_HOST>' IDENTIFIED BY '<YOUR_PASSWORD>';
Give it replication and table privileges:
1GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO '<YOUR_USER>'@'<YOUR_HOST>'; 2 GRANT ALL PRIVILEGES ON `<INSERT_YOUR_DB_NAME>`.* TO '<YOUR_USER>'@'<YOUR_HOST>'; 3 FLUSH PRIVILEGES;
Now execute the following command to check if replication is enabled:
show master status;
It should show something like the following:
1+------------------+----------+--------------+------------------+-------------------+ 2 | File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set | 3 +------------------+----------+--------------+------------------+-------------------+ 4 | mysql-bin.000001 | 001 | | | | 5 +------------------+----------+--------------+------------------+-------------------+
It indicates the current log file and the position of the last statement.
If you’re getting <Empty set>
or something like that, execute:
show variables like "%log_bin%";
If replication is enabled, you should see something like the this:
1+---------------------------------+--------------------------------+ 2 | Variable_name | Value | 3 +---------------------------------+--------------------------------+ 4 | log_bin | ON | 5 | log_bin_basename | /var/log/mysql/mysql-bin | 6 | log_bin_index | /var/log/mysql/mysql-bin.index | 7 | log_bin_trust_function_creators | OFF | 8 | log_bin_use_v1_row_events | OFF | 9 | sql_log_bin | ON | 10 +---------------------------------+--------------------------------+
Otherwise double check your configuration. You can learn more about replication here.
Now let’s create the Java program that will read the binary log.
It turns out that reading binary logs for change data capture is more common than you think.
Microsoft SQL Server has built-in support for change data capture.
Oracle offers GoldenGate for real-time data integration and replication.
MongoDB offers Change Streams to access real-time data changes.
For MySQL, there a lot of libraries for reading the binary log and stream changes as events to other sources. In this wiki, you can find many of these libraries.
Most of these libraries were made for enterprise system so they work natively with Apache Kafka, a publish and subscribe distributed platform that streams event and records to multiple sources.
But if you don’t need something like that, you can use mysql-binlog-connector-java, which allows you to read the binary log file and listen for changes as events from any Java program.
So open your favorite IDE and create a Maven project.
Or just create a directory structure like the following:
1src 2 |- main 3 |- java 4 |- pom.xml
In the pom.xml
file specify the project information, java version, and mysql-binlog-connector-java
and pusher-http-java
as dependencies:
1<?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <modelVersion>4.0.0</modelVersion> 6 7 <groupId>com.example</groupId> 8 <artifactId>MySQLRealtime</artifactId> 9 <version>1.0-SNAPSHOT</version> 10 11 <properties> 12 <maven.compiler.source>1.8</maven.compiler.source> 13 <maven.compiler.target>1.8</maven.compiler.target> 14 </properties> 15 16 <dependencies> 17 <dependency> 18 <groupId>com.github.shyiko</groupId> 19 <artifactId>mysql-binlog-connector-java</artifactId> 20 <version>0.16.1</version> 21 </dependency> 22 23 <dependency> 24 <groupId>com.pusher</groupId> 25 <artifactId>pusher-http-java</artifactId> 26 <version>1.0.0</version> 27 </dependency> 28 </dependencies> 29 30 </project>
Now create a class, let’s say scr/ReadLog.java
, with the code to connect to MySQL and listen for log events:
1public class ReadLog { 2 public static void main(String[] args) throws IOException { 3 BinaryLogClient client = 4 new BinaryLogClient("localhost", 3306, "<MYSQL_USER>", "<MYSQL_PASSWROD>"); 5 6 client.registerEventListener(event -> { 7 System.out.println(event); 8 }); 9 client.connect(); 10 } 11 }
If you execute this class, the program will block until an event is received from the log.
For example, this is an example of the events you receive when a database is created:
1Event{header=EventHeaderV4{timestamp=1524607461000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=463, flags=0}, data=null} 2 3 Event{header=EventHeaderV4{timestamp=1524607461000, eventType=QUERY, serverId=1, headerLength=19, dataLength=75, nextPosition=557, flags=8}, data=QueryEventData{threadId=6, executionTime=0, errorCode=0, database='test', sql='CREATE DATABASE test'}}
You receive an event for the creation of the global transaction identifier (GTID) and the actual query (CREATE DATABASE test
).
Here’s an example of the events you receive when a table is created:
1Event{header=EventHeaderV4{timestamp=1524609716000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1696, flags=0}, data=null} 2 3 Event{header=EventHeaderV4{timestamp=1524609716000, eventType=QUERY, serverId=1, headerLength=19, dataLength=181, nextPosition=1896, flags=0}, data=QueryEventData{threadId=6, executionTime=0, errorCode=0, database='test', sql='create table products(id int(11) not null auto_increment, name varchar(50) default null, price decimal(6,2), primary key (id))'}}
When you insert a record:
1Event{header=EventHeaderV4{timestamp=1524609804000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1961, flags=0}, data=null} 2 3 Event{header=EventHeaderV4{timestamp=1524609804000, eventType=QUERY, serverId=1, headerLength=19, dataLength=53, nextPosition=2033, flags=8}, data=QueryEventData{threadId=6, executionTime=0, errorCode=0, database='test', sql='BEGIN'}} 4 5 Event{header=EventHeaderV4{timestamp=1524609804000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=38, nextPosition=2090, flags=0}, data=TableMapEventData{tableId=109, database='test', table='products', columnTypes=3, 15, -10, columnMetadata=0, 50, 518, columnNullability={1, 2}}} 6 7 Event{header=EventHeaderV4{timestamp=1524609804000, eventType=EXT_WRITE_ROWS, serverId=1, headerLength=19, dataLength=31, nextPosition=2140, flags=0}, data=WriteRowsEventData{tableId=109, includedColumns={0, 1, 2}, rows=[ 8 [1, laptop, 999.99] 9 ]}} 10 11 Event{header=EventHeaderV4{timestamp=1524609804000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=2171, flags=0}, data=XidEventData{xid=28}}
When you update a record:
1Event{header=EventHeaderV4{timestamp=1524609897000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=2236, flags=0}, data=null} 2 3 Event{header=EventHeaderV4{timestamp=1524609897000, eventType=QUERY, serverId=1, headerLength=19, dataLength=53, nextPosition=2308, flags=8}, data=QueryEventData{threadId=6, executionTime=0, errorCode=0, database='test', sql='BEGIN'}} 4 5 Event{header=EventHeaderV4{timestamp=1524609897000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=38, nextPosition=2365, flags=0}, data=TableMapEventData{tableId=109, database='test', table='products', columnTypes=3, 15, -10, columnMetadata=0, 50, 518, columnNullability={1, 2}}} 6 7 Event{header=EventHeaderV4{timestamp=1524609897000, eventType=EXT_UPDATE_ROWS, serverId=1, headerLength=19, dataLength=47, nextPosition=2431, flags=0}, data=UpdateRowsEventData{tableId=109, includedColumnsBeforeUpdate={0, 1, 2}, includedColumns={0, 1, 2}, rows=[ 8 {before=[1, laptop, 999.99], after=[1, laptop, 100.01]} 9 ]}} 10 11 Event{header=EventHeaderV4{timestamp=1524609897000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=2462, flags=0}, data=XidEventData{xid=29}}
When you delete two records:
1Event{header=EventHeaderV4{timestamp=1524610005000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=2805, flags=0}, data=null} 2 3 Event{header=EventHeaderV4{timestamp=1524610005000, eventType=QUERY, serverId=1, headerLength=19, dataLength=53, nextPosition=2877, flags=8}, data=QueryEventData{threadId=6, executionTime=0, errorCode=0, database='test', sql='BEGIN'}} 4 5 Event{header=EventHeaderV4{timestamp=1524610005000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=38, nextPosition=2934, flags=0}, data=TableMapEventData{tableId=109, database='test', table='products', columnTypes=3, 15, -10, columnMetadata=0, 50, 518, columnNullability={1, 2}}} 6 7 Event{header=EventHeaderV4{timestamp=1524610005000, eventType=EXT_DELETE_ROWS, serverId=1, headerLength=19, dataLength=49, nextPosition=3002, flags=0}, data=DeleteRowsEventData{tableId=109, includedColumns={0, 1, 2}, rows=[ 8 [1, laptop, 100.01], 9 [2, laptop v2, 999.99] 10 ]}} 11 12 Event{header=EventHeaderV4{timestamp=1524610005000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=3033, flags=0}, data=XidEventData{xid=31}}
This way, you can see that data manipulation (DML) statements are mapped this way:
EXT_WRITE_ROWS
and you can find the information of the insertion in a class of type WriteRowsEventData
.EXT_UPDATE_ROWS
and you can find the information of the update in a class of type UpdateRowsEventData
.EXT_DELETE_ROWS
and you can find the information of the deletion in a class of type DeleteRowsEventData
.In addition, all of these events are preceded by a TABLE_MAP
event with information about the table and columns that are being modified.
So we need to listen for these events.
The only problem is that if you need to keep track of the changes of many tables in a separate way, you cannot rely on the tableId
field because this ID may change between executions.
You can change the way events are deserialized but maybe a simpler approach would be to keep track of the table names and IDs in a map.
Taking into account this, you can modify the program in this way:
1public class ReadLog { 2 public static void main(String[] args) throws IOException { 3 final Map<String, Long> tableMap = new HashMap<String, Long>(); 4 BinaryLogClient client = 5 new BinaryLogClient("localhost", 3306, "<MYSQL_USER>", "<MYSQL_PASSWROD>"); 6 7 client.registerEventListener(event -> { 8 EventData data = event.getData(); 9 10 if(data instanceof TableMapEventData) { 11 TableMapEventData tableData = (TableMapEventData)data; 12 tableMap.put(tableData.getTable(), tableData.getTableId()); 13 } 14 }); 15 client.connect(); 16 } 17 }
Notice how the program checks the subtype of EventData
to get the information.
Now, let’s add the Pusher object with the information you got when you created the app:
1public class ReadLog { 2 public static void main(String[] args) throws IOException { 3 final Map<String, Long> tableMap = new HashMap<String, Long>(); 4 5 Pusher pusher = 6 new Pusher("<PUSHER_APP_ID>", "<PUSHER_APP_KEY>", "<PUSHER_APP_SECRET>"); 7 pusher.setCluster("<PUSHER_APP_CLUSTER>"); 8 pusher.setEncrypted(true); 9 10 // ... 11 } 12 }
And you can check if the event is an insert, update or delete, you can check if it corresponds to the product
table, extract the product information and publish it as a map to a product
channel.
Here’s the code for INSERT
events:
1public class ReadLog { 2 public static void main(String[] args) throws IOException { 3 // ... 4 5 client.registerEventListener(event -> { 6 EventData data = event.getData(); 7 8 if(data instanceof TableMapEventData) { 9 // ... 10 } else if(data instanceof WriteRowsEventData) { 11 WriteRowsEventData eventData = (WriteRowsEventData)data; 12 if(eventData.getTableId() == tableMap.get(PRODUCT_TABLE_NAME)) { 13 for(Object[] product: eventData.getRows()) { 14 pusher.trigger( 15 PRODUCT_TABLE_NAME, "insert", getProductMap(product) 16 ); 17 } 18 } 19 } 20 }); 21 client.connect(); 22 } 23 24 static Map<String, String> getProductMap(Object[] product) { 25 Map<String, String> map = new HashMap<>(); 26 map.put("id", java.lang.String.valueOf(product[0])); 27 map.put("name", java.lang.String.valueOf(product[1])); 28 map.put("price", java.lang.String.valueOf(product[2])); 29 30 return map; 31 } 32 }
For the update event, only the after
data is needed. The before
and after
fields are formatted as a map entry, where after
is the value part of this structure:
1public class ReadLog { 2 public static void main(String[] args) throws IOException { 3 // ... 4 5 client.registerEventListener(event -> { 6 EventData data = event.getData(); 7 8 if(data instanceof TableMapEventData) { 9 // ... 10 } else if(data instanceof WriteRowsEventData) { 11 // ... 12 } else if(data instanceof UpdateRowsEventData) { 13 UpdateRowsEventData eventData = (UpdateRowsEventData)data; 14 if(eventData.getTableId() == tableMap.get(PRODUCT_TABLE_NAME)) { 15 for(Map.Entry<Serializable[], Serializable[]> row : 16 eventData.getRows()) { 17 pusher.trigger( 18 PRODUCT_TABLE_NAME, "update", getProductMap(row.getValue()) 19 ); 20 } 21 } 22 } 23 }); 24 client.connect(); 25 } 26 27 // ... 28 }
For the delete event, you’ll only need the ID of the deleted record:
1public class ReadLog { 2 public static void main(String[] args) throws IOException { 3 // ... 4 5 client.registerEventListener(event -> { 6 EventData data = event.getData(); 7 8 if(data instanceof TableMapEventData) { 9 // ... 10 } else if(data instanceof WriteRowsEventData) { 11 // ... 12 } else if(data instanceof UpdateRowsEventData) { 13 // ... 14 } else if(data instanceof DeleteRowsEventData) { 15 DeleteRowsEventData eventData = (DeleteRowsEventData)data; 16 if(eventData.getTableId() == tableMap.get(PRODUCT_TABLE_NAME)) { 17 for(Object[] product: eventData.getRows()) { 18 pusher.trigger(PRODUCT_TABLE_NAME, "delete", product[0]); 19 } 20 } 21 } 22 }); 23 client.connect(); 24 } 25 26 // ... 27 }
Now, any application listening for the product
channel will get the information about the database changes.
Let’s build a React client to show this.
Let’s use create-react-app to bootstrap a React app.
Execute the following command in a terminal window to create a new app:
npx create-react-app my-app
Now go into the app directory and install the Pusher dependency with npm
:
1cd my-app 2 npm install --save pusher-js
Open the file src/App.css
and add the following CSS styles:
1.table { 2 border: 2px solid #FFFFFF; 3 width: 100%; 4 text-align: center; 5 border-collapse: collapse; 6 } 7 .table td, .table th { 8 border: 1px solid #FFFFFF; 9 padding: 3px 4px; 10 } 11 .table tbody td { 12 font-size: 13px; 13 } 14 .table thead { 15 background: #FFFFFF; 16 border-bottom: 4px solid #333333; 17 } 18 .table thead th { 19 font-size: 15px; 20 font-weight: bold; 21 color: #333333; 22 text-align: center; 23 border-left: 2px solid #333333; 24 } 25 .table thead th:first-child { 26 border-left: none; 27 }
Now let’s create a new component, src/Table.js
, to show the product information (received as a property) in a table:
1import React, { Component } from 'react'; 2 import './App.css'; 3 4 export default class Table extends Component { 5 render() { 6 const rowsMapped =this.props.rows.map(row => ( 7 <tr key={row.id}> 8 <td>{row.id}</td> 9 <td>{row.name}</td> 10 <td>{row.price}</td> 11 </tr> 12 )); 13 14 return ( 15 <table className="table"> 16 <thead> 17 <tr> 18 <th>ID</th> 19 <th>Name</th> 20 <th>Price</th> 21 </tr> 22 </thead> 23 <tbody> 24 {rowsMapped} 25 </tbody> 26 </table> 27 ); 28 } 29 }
Now modify the file src/App.js
to import this component and Pusher:
1import React, { Component } from 'react'; 2 import logo from './logo.svg'; 3 import './App.css'; 4 5 import Table from './Table.js'; 6 7 import Pusher from 'pusher-js'; 8 9 class App extends Component { 10 // ... 11 }
Let’s have the array of rows as the state of this component, and while we are at the constructor, let’s bind the functions we are going to use to insert, update and delete items:
1// ... 2 3 class App extends Component { 4 constructor(props) { 5 super(props); 6 this.state = {rows: []}; 7 8 this.insert = this.insert.bind(this); 9 this.update = this.update.bind(this); 10 this.delete = this.delete.bind(this); 11 } 12 }
In the componentDidMount
method, let’s configure the Pusher object subscribe to the channel to get the events:
1// ... 2 3 class App extends Component { 4 constructor(props) { 5 // ... 6 } 7 8 componentDidMount() { 9 this.pusher = new Pusher('<PUSHER_APP_KEY>', { 10 cluster: '<PUSHER_APP_CLUSTER>', 11 encrypted: true, 12 }); 13 this.channel = this.pusher.subscribe('products'); 14 15 this.channel.bind('insert', this.insert); 16 this.channel.bind('update', this.update); 17 this.channel.bind('delete', this.delete); 18 } 19 }
These are the functions to insert, update and delete items from this.state.rows
:
1// ... 2 3 class App extends Component { 4 // ... 5 insert(data) { 6 this.setState(prevState => ({ 7 rows: [ data, ...prevState.rows ] 8 })); 9 } 10 11 update(data) { 12 this.setState(prevState => ({ 13 rows: prevState.rows.map(el => 14 el.id === data.id ? data : el 15 ) 16 })); 17 } 18 19 delete(id) { 20 this.setState(prevState => ({ 21 rows: prevState.rows.filter(el => el.id !== String(id)) 22 })); 23 } 24 }
Finally, the render
function will look like this:
1// ... 2 3 class App extends Component { 4 // ... 5 render() { 6 return ( 7 <div className="App"> 8 <header className="App-header"> 9 <img src={logo} className="App-logo" alt="logo" /> 10 <h1 className="App-title">Welcome to React</h1> 11 </header> 12 <Table rows={this.state.rows} /> 13 </div> 14 ); 15 } 16 }
And that’s it.
Let’s test the application.
Make sure the MySQL server is running with replication enabled.
If you’re working with an IDE, run the class ReadLog
.
Otherwise, you can add this property to the pom.xml
file:
1<properties> 2 ... 3 <exec.mainClass>ReadLog</exec.mainClass> 4 </properties>
And execute this command to run the app:
mvn exec:java
For the React app, inside the app directory, execute:
npm start
A browser window will open http://localhost:3000/, and from there, you can connect to the database with the mysql
client and insert, update or delete records in the product
table:
In this tutorial, you have learned how to turn MySQL into a realtime database by using the replication log to publish the changes made to a database using Pusher.
You used mysql-binlog-connector-java to get the insert, update and delete events from the log. However, at the time of this writing, the current version of MySQL (MySQL 8.0.11) is not yet supported.
But there are other options. As mentioned before, in this wiki you can find more libraries to work with MySQL binary log.
In this blog post, you can find another way to extract data from MySQL using Alibaba’s open sourced Canal project.
The applications that this tutorial present are simple but they show how this change data capture using transaction logs work.
They can be extended in many ways:
Remember that all the source code for this applications is available on Github.