MySQL to Hadoop Step-By-Step

We had a great webinar on Thursday about replicating from MySQL to Hadoop (watch the whole thing). It was great, but one of the questions at the end was ‘is there an easy way to test’.

Sadly we can’t go giving out convenient ready-to-run downloads of these things because of licensing and and other complexities, so I want to try and make it as simple and straightforward as possible by giving you the directions to complete. I’m going to be point to the Continuent Documentation every now and then so this is not too crowded, but we should get through it pretty easily.

Major Decisions

For this to work: 

  • We’ll setup two VMs, one the master (running MySQL), the other the slave (Running Cloudera)
  • The two VMs must be able to reach each other on the network. It doesn’t matter whether they are running Internal, NAT, or bridge-mode network, they just need to be able to ping and SSH each other. Switch off firewalls to prevent port weirdness.
  • For convenience, update your /etc/hosts to have a host1 (the master) and host2 (the slave)
  • The master must have followed the prereqs; for the slave it’s optional, but highly recommended

With that in mind, let’s get started.

Step 1: Setup your Master Host

There are a number of ways you can do this. If you want to simplify things and have VirtualBox, try downloading this VM. It’s a 1.5GB download containing and OVF VM, and is a Ubuntu host, with our prerequisites followed. To use this :

  1. Uncompress the package.
  2. Import the VM into your VirtualBox.
  3. If you want, change the network type from Internal to a bridged or NAT environment.

Using internal networking, you can login to this using:

shell> ssh -p2222 tungsten@localhost

Passwords are ‘password’ for tungsten and root.

If you don’t want to follow this but want your own VM:

  1. Create a new VM with 1-2GB of RAM, and 8GB or more of disk space
  2. Install your OS of choice, either Ubuntu or CentOS
  3. Follow our prerequisite instructions
  4. Make sure MySQL is setup and running, and that the binary logging is enabled
  5. Make sure it has a valid IP address

Step 2: Setup Tungsten Replicator

Download the latest Tungsten Replicator binary from this page

Unpack the file:

shell> tar zxf tungsten-replicator-3.0.tar.gz

Change into the directory:

shell> cd tungsten-replicator-3.0

Create a new replicator installation, this will read from the binary log into THL:

shell> ./tools/tpm install alpha \
--install-directory=/opt/continuent \
--master=host1 \
--members=host1 \
--java-file-encoding=UTF8 \
--java-user-timezone=GMT \
--mysql-enable-enumtostring=true \
--mysql-enable-settostring=true \
--mysql-use-bytes-for-string=false \
--svc-extractor-filters=colnames,pkey \
--property=replicator.filter.pkey.addColumnsToDeletes=true \
--property=replicator.filter.pkey.addPkeyToInserts=true \
--replication-password=password \
--replication-user=tungsten \
--skip-validation-check=HostsFileCheck \
--skip-validation-check=ReplicationServicePipelines \
--start-and-report=true

For a full description of what’s going on here, see this page and click on the magnifying glass. You’ll get the full description of each option.

To make sure everything is OK, you should get a status from trepctl generated. If it’s running and it shows the status as online, we’re ready.

Step 3: Get your Cloudera Host Ready

There are lots of ways to get Cloudera’s Hadoop solution installed. The ready-to-run VM is the simplest by far.

  1. Download the Cloudera VM quick start host from here; there are versions for VirtualBox and VMware and KVM.
  2. Set the networking type to match the master.
  3. Start the host
  4. Set the hostname to host2
  5. Update the networking to an IP address that can talk to the master.
  6. Update /etc/hosts to add the IP address of host1 and host2 e.g.:

192.168.0.2 host1

Add a ‘tungsten’ user which we will use to install Tungsten Replicator.

Step 4: Install your Hadoop Slave

Download the latest Tungsten Replicator binary from this page

Unpack the file:

shell> tar zxf tungsten-replicator-3.0.tar.gz

Change into the directory:

shell> cd tungsten-replicator-3.0

Create a new replicator installation, this will read the information from the master (host1) and apply it to this host (host2)

shell> ./tools/tpm install alpha \
--batch-enabled=true \
--batch-load-language=js \
--batch-load-template=hadoop \
--datasource-type=file \
--install-directory=/opt/continuent \
--java-file-encoding=UTF8 \
--java-user-timezone=GMT \
--master=host1 \
--members=host2 \
'--property=replicator.datasource.applier.csv.fieldSeparator=\\u0001' \
--property=replicator.datasource.applier.csv.useQuotes=false \
--property=replicator.stage.q-to-dbms.blockCommitInterval=1s \
--property=replicator.stage.q-to-dbms.blockCommitRowCount=1000 \
--replication-password=secret \
--replication-user=tungsten \
--skip-validation-check=DatasourceDBPort \
--skip-validation-check=DirectDatasourceDBPort \
--skip-validation-check=HostsFileCheck \
--skip-validation-check=InstallerMasterSlaveCheck \
--skip-validation-check=ReplicationServicePipelines \
--start-and-report=true

For a description of the options, visit this page and click on the second magnifying glass to get the description.

As before, we want everything to be running and for the replicator to be online, run:

shell> trepctl status

This should tell you everything is running – if you get an error about this not being found, source the environment to populate your PATH correctly:

shell> source /opt/continuent/share/env.sh

We want everything to be online and running. If it isn’t, use the docs to help determine the reason, or use our discussion group to ask questions.

Step 5: Generating DDL

For your chosen MySQL database schema, you need to generate the staging and live table definitions for Hive.

A tool, ddlscan, is provided for this. You need to run it and provide the JDBC connect string for your database, and your user and password. If you followed the prereqs, use the one for the tungsten user.

First create the live table DDL:

shell> ddlscan -user tungsten -url 'jdbc:mysql://host1:3306/test' -pass password -template ddl-mysql-hive-0.10.vm -db test > schema.sql

Now apply it to Hive:

shell> cat schema.sql | hive

To create Hive tables that read the staging files loaded by the replicator, use the ddl-mysql-hive-0.10-staging.vm template on the same database:

shell> ddlscan -user tungsten -url 'jdbc:mysql://host:3306/test' -pass password -template ddl-mysql-hive-0.10-staging.vm -db test > schema-staging.sql

Now apply it to Hive again:

shell> cat schema-staging.sql | hive

Step 6: Start Writing Data

Hopefully by this point you’ve got two VMs, one running MySQL and the master replicator extracting info from the MySQL binary log. On the other, you have a basic Cloudera instance with a slave replicator writing changes. Both replicator should be online (use ‘trepctl status’ to check).

All you need to do is start writing data into the tables you selected when creating the DDL. That should be it – you should see data start to stream into Hadoop.

Getting Data into Hadoop in real-time

Moving data between databases is hard. Without ever intending it, I seem to have spent a lifetime working on solutions for getting data into and out of databases, but more frequently between. In fact, my first job out of university was migrating data from BRS/Text, a free-text database (probably what we would call a NoSQL) into a more structured Oracle.

Today I spend some of my time working in Big Data, more often than not, migrating information from existing data stores into Big Data so that they can be analysed, something I covered in more detail here:

http://www.ibm.com/developerworks/library/bd-sqltohadoop1/index.html
http://www.ibm.com/developerworks/library/bd-sqltohadoop2/index.html
http://www.ibm.com/developerworks/library/bd-sqltohadoop3/

The problem with the current techniques, Sqoop included, is that they rely on a relatively manual, even basic, transfer process. Dump your data out, reload it back again into Hadoop.

Even with Sqoop, although it automates much of the process, it is not entirely reliable, especially if you want to do more than simple dump and load. Serial loading, or incrementally transferring data from MySQL or Oracle, is fraught with problems, not least of which is that it requires adding a timestamp to your data structure to get the best results out of it.

Perhaps worse though is that Sqoop is an intermittent, even periodic transfer system. Incremental loading works by copying all the changed records since a specific point in time. Running it too frequently is counter productive, which means you end up using a 15-minute or every-couple-of-hour period, depending on your database activity.

Most databases have some kind of stream of changes that enables you to see everything that has happened on the database. With MySQL, that’s the binary log. And with the Open Source Tungsten Replicator tool we take advantage of that so that we can replicate into MySQL, and indeed into Oracle, MongoDB and Vertica, among others.

51c6a7b5abcca6c30f7d79ea8eba17f0

Reading the data out from MySQL is lightweight since the master just reads the contents of the binary log; especially compared to Sqoop, which uses read locks and SELECT * with and without LIMIT clauses.

Right now we’re working on an applier that writes that data into Hadoop in real time from MySQL. Unlike Sqoop, we provide a continuous stream of changes from MySQL into the immutable store of Hadoop.

But the loading and nature of Hadoop presents some interesting issues, not least of which (if you’ve been following my other articles) is the fact that data written into Hadoop is immutable. For data that is constantly changing, an immutable store is not the obvious destination.

We get round that by using the batch loading system to create CSV files that contain the data, changes and sequence numbers, and then loading that information into Hadoop. In fact, Robert has updated the batch loader to use a new JavaScript based system (of which more in a future blog post) that simplifies the entire process, without requiring a direct connection or interface to Hadoop (although we can write directly into HDFS).

For example, the MySQL row:


| 3 | #1 Single | 2006 | Cats and Dogs (#1.4) |

Is represented within the staging files generated as:


I^A1318^A3^A3^A#1 Single^A2006^ACats and Dogs (#1.4)

That’s directly accessible by Hive. In fact, using our ddlscan tool, we can even create the Hive table definitions for you:


ddlscan -user tungsten -url 'jdbc:mysql://host1:13306/test' -pass password \
-template ddl-mysql-hive-0.10.vm -db test

Then we can use that record of changes to create a live version of the data, using a straightforward query within Hive. In fact, Hive provides the final crucial stage of the loading process by giving us that live view of the change data, and we simplify that element by providing the core data, and ensuring that the CSV data is in the right format for Hive to use the files without changes.

The process is quite remarkable; speed-wise for direct dumps, Tungsten Replicator is comparable to Sqoop, but when it comes to change data, the difference is that we have the information in real time. You don’t have to wait for the next Sqoop load, or for the incremental loading and row selection of Sqoop, instead, we just apply the changes written into the binary log.

Of course, we can fine tune the intervals of the writes of the CSV change data into Hadoop using the block commit properties (see http://docs.continuent.com/tungsten-replicator-2.2/performance-block.html). For example, this means by default we commit into Hadoop every 10s or 10,000 rows, but we can change it to commit every 5s or 1,000 rows if your data is critical and busy.

We’re still optimising and improving the system, but I can tell you that in my own tests we can handle GB of change data and information in a live fashion, both across single-table and multi-table/multi-schema datasets. What’s particularly cool is that if you are using Hadoop as a concentrator for all of your MySQL data for analysis, we can transfer from multiple MySQL servers into Hadoop simultaneously and take advantage of the multi-node Hadoop environment to cope with the load.

Building flexible apps from big data sources

My article on how to build flexible apps on top of the BigInsights platform has been published. This demonstrates a cool way to combine some client-end JavaScript and existing technologies to build a Big Data query interface without developing a specialised application for the purpose.

It’s no secret that a significant proportion of the needs for big data have come from the explosion in Internet technologies. Up until 10-20 years ago, the idea of a public-facing application having more than a few million users was unheard of. Today, even a modest website can have millions of users, and if it’s active, can generate millions of data items every day. The irony is that the very infrastructure and systems that create big data can also work in reverse, and provide some of the better ways to integrate and work with that data. Usefully, InfoSphere® BigInsights™ comes with support for managing and executing data jobs through a simple REST API. And through the Jaql interface, we can run queries and get information directly from a Hadoop cluster. This article looks at how these systems work together to give you a rich basis for capturing data and provide an interface to get the information back out again.

Building flexible apps from big data sources.

Process big data with Big SQL in InfoSphere BigInsights

The ability to write an SQL statement against your Big Data stored in Hadoop provides some much needed flexibility. Sure, using Hive or HBase you can perform some of those operations, but there are other alternatives that may suit your needs better, such as the Big SQL utility. My latest article on this tool is provided here:

SQL is a practical querying language, but is has limitations. Big SQL enables you to run complex queries on non-tabular data and query it with an SQL-like language. The difference with Big SQL is that you are accessing data that may be non-tabular, and may in fact not be based upon a typical SQL database structure. Using Big SQL, you can import and process large volume data sets, including by taking the processed output of other processing jobs within InfoSphere BigInsights™ to turn that information into easily query-able data. In this article, we look at how you can replace your existing infrastructure and queries with Big SQL, and how to take more complex queries and convert them to make use of your Big SQL environment.

Process big data with Big SQL in InfoSphere BigInsights.

SQL to Hadoop and back again, Part 3: Direct transfer and live data exchange

The third, and final article in my series on migrating data to and from Hadoop and SQL databases is now available:

Big data is a term that has been used regularly now for almost a decade, and it — along with technologies like NoSQL — are seen as the replacements for the long-successful RDBMS solutions that use SQL. Today, DB2®, Oracle, Microsoft® SQL Server MySQL, and PostgreSQL dominate the SQL space and still make up a considerable proportion of the overall market. In this final article of the series, we will look at more automated solutions for migrating data to and from Hadoop. In the previous articles, we concentrated on methods that take exports or otherwise formatted and extracted data from your SQL source, load that into Hadoop in some way, then process or parse it. But if you want to analyze big data, you probably don’t want to wait while exporting the data. Here, we’re going to look at some methods and tools that enable a live transfer of data between your SQL and Hadoop environments.

SQL to Hadoop and back again, Part 3: Direct transfer and live data exchange.

SQL to Hadoop and back again, Part 1: Basic data interchange techniques

I’ve got a new article, which is part of a new three-part series, on moving data between SQL and Hadoop, both the export to Hadoop and importing processed content back into an SQL store.

In this first one, we look at the basic mechanics and considerations before you start the migration of data, such as the data format, content, and export techniques.

Read: SQL to Hadoop and back again, Part 1: Basic data interchange techniques

Using Hadoop and Couchbase

My new article on using Hadoop with Couchbase is available now on the IBM developerWorks site. 

The article tells you how to integrate the massive map/reduce functionality offered by Hadoop with the query functionality offered in Couchbase.                                                                                                                            

With this article you also get a live demo of the process in action, and an intro video for the problems at hand we are trying to solve: 

Read: Using Hadoop with Couchbase

Fortunately the article was also chosen as a feature article for the entire developerWorks site, and came with call picture of an elephant sitting on a couch!