Continuent Replication to Hadoop – Now in Stereo!

Hopefully by now you have already seen that we are working on Hadoop replication. I’m happy to say that it is going really well. I’ve managed to push a few terabytes of data and different data sets through into Hadoop on Cloudera, HortonWorks, and Amazon’s Elastic MapReduce (EMR). For those who have been following my long association with the IBM InfoSphere BigInsights Hadoop product, and I’m pleased to say that it’s working there too. I’ve had to adapt Robert’s original script to work with the different versions of the underlying Hadoop tools and systems to make it compatible. The actual performance and process is unchanged; you just use a different JS-based batchloader script to work with different tools.

Robert has also been simplifying some of the core functionality, such as configuring some fixed pre-determined formats, so you no longer have to explicitly set the field and record separators.

I’ve also been testing the key feature of being able to integrate the provisiong of information using Sqoop and merging that original Sqooped data into Hadoop, and then following up with the change data that the replicator is effectively transferring over. The system works exactly as I’ve just described – start the replicator, Sqoop the data, materialise the view within Hadoop. It’s that easy; in fact, if you want a deeper demonstration of all of these features, we’ve got a video from my recent webinar session:

Real Time Data Loading from MySQL to Hadoop with New Tungsten Replicator 3.0

If you can’t spare the time, but still want to know about our Hadoop applier, try our short 5-minute video:

Real-time data loading into Hadoop with Tungsten Replicator

While you’re there, check out the Clustering video I did at the same time:

Continuent Tungsten Clustering

And of course, don’t forget that you can see the product and demos live by attending Percona Live in Santa Clara this week (1st-4th April).

Parallel Extractor for Provisioning

Coming up as a new feature in Tungsten Replicator (and written by our replicator expert Stephane Giron) is the ability to provision a new database by using data from an existing database. This new feature comes in the form of a tool called the Parallel Extractor.

The principles are very simple. On the master side:

  • Start the master replicator offline.
  • Switch the replicator to the online provision state.
  • The master replicator pulls the data out of the existing database and writes that information into the Transaction History Log (THL). At this point, the normal replicator thread is not extracting events from the source database.
  • Once the parallel replication has completed, the replicator switches over to normal extraction mode, and starts writing change data into the THL.

On the slave side, the THL events are read as usual from the master and applied to the slave, but because the provisioned data is inserted into the start of the THL before the main THL thread, the slave reads the provisioned data first, then the data changes that occurred since the provisioning started.

In fact, it’s best to think of it like the diagram below:

Parallel Extractor Blog THL
The parallel extraction happens in a very specific fashion:

A chunking thread identifies all the tables, and also identifies the keys and chunks that can be extracted from each table. It then coordinates the multiple threads:

  • Multiple chunks from the source tables are extracted in parallel.
  • Multiple tables are extracted in parallel.

Because both of these operations happen at the same time, the parallel extractor can pull from multiple tables and multiple chunks, meaning that the actual extraction of the data happens very quickly. In fact, tests are running at a rate of about 80 million rows/15 mins. That was from a single table.
http://mcslp.wordpress.com/?p=10045&preview=true

Parallel Extractor Blog Figure

Obviously the number of parallel threads can be controlled, and in fact, the chunking is controlled further by use of a configuration file to determine the chunking configuration.

Currently, the parallel extractor is designed to work for Oracle to MySQL provisioning with Tungsten Replicator, but the same principles can be applied to MySQL-to-MySQL setups. Using the parallel extractor is deceptively simple, and you can check out the current, Oracle-related, instructions here.

What this provides is a very simple way to take an entire existing database full of data and seed your target database with that information by using the replicator. This means the Parallel Extractor could be used to provision new slaves when expanding an existing cluster, to convert a single-machine installation to use replication by seeding the slave with the existing data without needing a backup, or, as currently designed, to seed a heterogeneous replication installation with new data without having to use a complex dump, massage and reload process.

Using the Continuent Docs

As hopefully has been noticed, the Continuent documentation is achieving a pretty good critical mass. The content of the documentation is always the most important consideration. Secondary is making sure that the information in the documentation can be found, and that when reading, you can hover and click to get relevant information so that you can understand the content and information being provided even better.

We’ve got a few different solutions and tips that I think are worth highlighting so that people can use the documentation more effectively.

Searching

When you want to look for something in the documentation, use the search bar right up at the top. The search is available both on the Documentation Library page and within individual documents.

Screen Shot 2014-03-12 at 07.13.22

When used on the Documentation Library page, search shows you potential matches across all the documentation for the word or item you are searching for. For example, here where I’ve searched for FAQ. Entries are ranked by the manual according to releases:

Screen Shot 2014-03-12 at 07.17.32

When searching within a document, you get shown the items within this document first, followed by matches within other documents:

Screen Shot 2014-03-12 at 07.22.39

The search content itself is heavily indexed and designed so that you should go to the right item as the first one in the list.

It also works both on wide terms, for example, Filters, but it also works on commands, and command-line arguments and options within a typical command. For example, type ‘trepctl status’ and you will get not only the key command, but all it’s derivatives. But type in an option, like ‘-at-event’, and you’ll get the explicit entry for that item.

Screen Shot 2014-03-12 at 07.28.44

Note that the search is very deliberately not a free-text search. This is to ensure that you get to exactly the right page, rather than all the pages that might mention ‘trepctl status’.

Hover Highlights

When reading the documentation you might come across some terms or information that you are not familiar with. In this case, hover over the item and you’ll get a definition.

Screen Shot 2014-03-12 at 07.40.13

Click the highlighted item, and you’ll get taken to the reference page for that specific item.

Deep Linking

I mentioned the mechanics of this process recently, but the use-case within the documentation is that virtually everything of significance is automatically linked to the right, canonical, page for the information.

For example, in the image below, there are links to the various ONLINE and OFFLINE states that can be clicked on, and the same is true for nearly all filenames, options, commands, and all combinations thereof.

Screen Shot 2014-03-12 at 10.12.11

Related Pages

In certain sections, links to other pages that might be useful to the current discussion, but which we do not directly link to in reference to another item are listed in the sidebar.

This is supported for related pages:

Screen Shot 2014-03-12 at 10.25.58

FAQ entries:

Screen Shot 2014-03-12 at 10.52.26

We don’t have entries yet, but release note and Error/Cause/Solution (troubleshooting) links are supported too. Note that these links only appear on pages that have the related items.

Table of Contents Navigation

Immediately above the related pages is the basic navigation section. These are divided into:

  • Parent Sections – these are sections at the same level as the current page that you might want to jump to. For example, you can easily jump from Fan-In to Star deployments.
  • Navigate Up – Goes up the parent.
  • Chapters – A list of all the chapters and appendices in this manual.

Other Manuals

For each page in each manual we also provide a link to the same page in other manuals. There are two reasons for this, the first is so that you can compare or jump to differences in other versions of the same manual. The second is to jump between the Tungsten Replicator and Continuent Tungsten if you find yourself in the right page, but the wrong product manual.

Screen Shot 2014-03-12 at 10.42.58

So as you can see, there’s a lot more to the docs than just the content (critical though it is), and hopefully this has helped to explain how usable the documentation is and more important how easy it should be to find the information you need.

Intelligent Linking and Indexing in DocBook

One of the issues I have with DocBook XML is that the links are a little forced and manual. 

By that, I mean that if I have a command, like trepctl, and I used it in a sentence or description, if I want to link trepctl back to the corresponding trepctl page, I have to manually add it like this:

<link linkend="cmdline-tools-trepctl"><command>trepctl</command></link>

Not only is that a mouthful to say, it’s a lot of keys to type. 

So I’ve fixed that. 

What I do instead is add a <remark> block into the documentation for that command-line page:

<section id="cmdline-tools-trepctl">
<title>The trepctl Command</title>
<remark role="index:canonical" condition="command:trepctl"/>
...
</section>

The ‘role’ attribute specifies the index entry, and that this is canonical, i.e., this is the canon page for <command>trepctl</command>

That’s what is defined in the ‘condition’ attribute. 

This means that when I put <command>trepctl</command>, during post-processing, the command is automatically linked to that page without me having to manually do that. 

You can see the effect of this at the top of this page.

It works for anything, and it works for longer fragments too, so I can do ‘Use the <command>trepctl status</command> command’, and the post-processing will automatically link to the canonical page for the trepctl status command. On that same page, you can see links to the field names in the output. 

This uses an extension to the original index reference format: 

<remark role="index:canonical" condition="parameter:appliedLastSeqno:thl"/>

That third argument to the condition attribute gives a ‘hint’ as to what it might apply to. This means that we can link using a commonly used DocBook element, such as parameter, and tag it to link to this canonical page, just by adding a condition attribute to the parameter element, like this:

<parameter condition="thl">appliedLastSeqno</parameter>

OK, so it’s still long, but it’s less complex than writing out <link> or <xref> elements, and it means that I don’t have to know the ID where the information is held, that’s entirely driven from marking up the content with the canonical index entry. 

Finally, and perhaps the most important point, is that you can go to any of the Continuent documentation pages and type either the partial or full command, and it will take you to the canonical page for that command, option, etc, which means not only is the content heavily linked (making it more useful), but it also makes it more easily searchable to the right place. 

Customizing Chunking in DocBook

I love DocBook XML. No, really. But one thing I hate is the way you have to set a global chunking level for your HTML and then live with it. For most documentation, you want to be able to choose whether a conveniently addressable section within a chapter, and then you want to combine it into one page to make it easier to read.

For example, consider this page in the Continuent docs. Technically it’s high enough (I use a default chunking depth of 4) to be chunked, but I want the subsections on one page to make it easier to read.

Custom chunking in DocBook is clunky, so here’s an alternative.

Create a custom copy of your html.online.chunk-common.xsl.

Find the main chunk template definition (around line 996).

Add these two lines to the <xsl:choose> block:

<xsl:when test="$node[@condition='nochunk']">0</xsl:when>
<xsl:when test="$node[@condition='forcechunk']">1</xsl:when>

These two overwrite the implied chunking decision based on object type or depth. 

Now in your Docbook XML you can choose whether an item should be chunked or not by adding a condition attribute to your section. To chunk it:

<section id="chunkme" condition="forcechunk">...</section>

To prevent a section from being chunked:

<section id="dontchunkme" condition="nochunk">...</section>

MySQL to Hadoop Step-By-Step

We had a great webinar on Thursday about replicating from MySQL to Hadoop (watch the whole thing). It was great, but one of the questions at the end was ‘is there an easy way to test’.

Sadly we can’t go giving out convenient ready-to-run downloads of these things because of licensing and and other complexities, so I want to try and make it as simple and straightforward as possible by giving you the directions to complete. I’m going to be point to the Continuent Documentation every now and then so this is not too crowded, but we should get through it pretty easily.

Major Decisions

For this to work: 

  • We’ll setup two VMs, one the master (running MySQL), the other the slave (Running Cloudera)
  • The two VMs must be able to reach each other on the network. It doesn’t matter whether they are running Internal, NAT, or bridge-mode network, they just need to be able to ping and SSH each other. Switch off firewalls to prevent port weirdness.
  • For convenience, update your /etc/hosts to have a host1 (the master) and host2 (the slave)
  • The master must have followed the prereqs; for the slave it’s optional, but highly recommended

With that in mind, let’s get started.

Step 1: Setup your Master Host

There are a number of ways you can do this. If you want to simplify things and have VirtualBox, try downloading this VM. It’s a 1.5GB download containing and OVF VM, and is a Ubuntu host, with our prerequisites followed. To use this :

  1. Uncompress the package.
  2. Import the VM into your VirtualBox.
  3. If you want, change the network type from Internal to a bridged or NAT environment.

Using internal networking, you can login to this using:

shell> ssh -p2222 tungsten@localhost

Passwords are ‘password’ for tungsten and root.

If you don’t want to follow this but want your own VM:

  1. Create a new VM with 1-2GB of RAM, and 8GB or more of disk space
  2. Install your OS of choice, either Ubuntu or CentOS
  3. Follow our prerequisite instructions
  4. Make sure MySQL is setup and running, and that the binary logging is enabled
  5. Make sure it has a valid IP address

Step 2: Setup Tungsten Replicator

Download the latest Tungsten Replicator binary from this page

Unpack the file:

shell> tar zxf tungsten-replicator-3.0.tar.gz

Change into the directory:

shell> cd tungsten-replicator-3.0

Create a new replicator installation, this will read from the binary log into THL:

shell> ./tools/tpm install alpha \
--install-directory=/opt/continuent \
--master=host1 \
--members=host1 \
--java-file-encoding=UTF8 \
--java-user-timezone=GMT \
--mysql-enable-enumtostring=true \
--mysql-enable-settostring=true \
--mysql-use-bytes-for-string=false \
--svc-extractor-filters=colnames,pkey \
--property=replicator.filter.pkey.addColumnsToDeletes=true \
--property=replicator.filter.pkey.addPkeyToInserts=true \
--replication-password=password \
--replication-user=tungsten \
--skip-validation-check=HostsFileCheck \
--skip-validation-check=ReplicationServicePipelines \
--start-and-report=true

For a full description of what’s going on here, see this page and click on the magnifying glass. You’ll get the full description of each option.

To make sure everything is OK, you should get a status from trepctl generated. If it’s running and it shows the status as online, we’re ready.

Step 3: Get your Cloudera Host Ready

There are lots of ways to get Cloudera’s Hadoop solution installed. The ready-to-run VM is the simplest by far.

  1. Download the Cloudera VM quick start host from here; there are versions for VirtualBox and VMware and KVM.
  2. Set the networking type to match the master.
  3. Start the host
  4. Set the hostname to host2
  5. Update the networking to an IP address that can talk to the master.
  6. Update /etc/hosts to add the IP address of host1 and host2 e.g.:

192.168.0.2 host1

Add a ‘tungsten’ user which we will use to install Tungsten Replicator.

Step 4: Install your Hadoop Slave

Download the latest Tungsten Replicator binary from this page

Unpack the file:

shell> tar zxf tungsten-replicator-3.0.tar.gz

Change into the directory:

shell> cd tungsten-replicator-3.0

Create a new replicator installation, this will read the information from the master (host1) and apply it to this host (host2)

shell> ./tools/tpm install alpha \
--batch-enabled=true \
--batch-load-language=js \
--batch-load-template=hadoop \
--datasource-type=file \
--install-directory=/opt/continuent \
--java-file-encoding=UTF8 \
--java-user-timezone=GMT \
--master=host1 \
--members=host2 \
'--property=replicator.datasource.applier.csv.fieldSeparator=\\u0001' \
--property=replicator.datasource.applier.csv.useQuotes=false \
--property=replicator.stage.q-to-dbms.blockCommitInterval=1s \
--property=replicator.stage.q-to-dbms.blockCommitRowCount=1000 \
--replication-password=secret \
--replication-user=tungsten \
--skip-validation-check=DatasourceDBPort \
--skip-validation-check=DirectDatasourceDBPort \
--skip-validation-check=HostsFileCheck \
--skip-validation-check=InstallerMasterSlaveCheck \
--skip-validation-check=ReplicationServicePipelines \
--start-and-report=true

For a description of the options, visit this page and click on the second magnifying glass to get the description.

As before, we want everything to be running and for the replicator to be online, run:

shell> trepctl status

This should tell you everything is running – if you get an error about this not being found, source the environment to populate your PATH correctly:

shell> source /opt/continuent/share/env.sh

We want everything to be online and running. If it isn’t, use the docs to help determine the reason, or use our discussion group to ask questions.

Step 5: Generating DDL

For your chosen MySQL database schema, you need to generate the staging and live table definitions for Hive.

A tool, ddlscan, is provided for this. You need to run it and provide the JDBC connect string for your database, and your user and password. If you followed the prereqs, use the one for the tungsten user.

First create the live table DDL:

shell> ddlscan -user tungsten -url 'jdbc:mysql://host1:3306/test' -pass password -template ddl-mysql-hive-0.10.vm -db test > schema.sql

Now apply it to Hive:

shell> cat schema.sql | hive

To create Hive tables that read the staging files loaded by the replicator, use the ddl-mysql-hive-0.10-staging.vm template on the same database:

shell> ddlscan -user tungsten -url 'jdbc:mysql://host:3306/test' -pass password -template ddl-mysql-hive-0.10-staging.vm -db test > schema-staging.sql

Now apply it to Hive again:

shell> cat schema-staging.sql | hive

Step 6: Start Writing Data

Hopefully by this point you’ve got two VMs, one running MySQL and the master replicator extracting info from the MySQL binary log. On the other, you have a basic Cloudera instance with a slave replicator writing changes. Both replicator should be online (use ‘trepctl status’ to check).

All you need to do is start writing data into the tables you selected when creating the DDL. That should be it – you should see data start to stream into Hadoop.

Real-Time Replication from MySQL to Cassandra

Earlier this month I blogged about our new Hadoop applier, I published the docs for that this week (http://docs.continuent.com/tungsten-replicator-3.0/deployment-hadoop.html) as part of the Tungsten Replicator 3.0 documentation (http://docs.continuent.com/tungsten-replicator-3.0/index.html). It contains some additional interesting nuggets that will appear in future blog posts.

The main part of that functionality that performs the actual applier for Hadoop is based around a JavaScript applier engine – there will eventually be docs for that as part of the Batch Applier content (http://docs.continuent.com/tungsten-replicator-3.0/deployment-batchloading.html). The core of this system is that it    takes the information from the data stream of the THL and the CSV file that was written by the batch applier system, and runs the commands necessary to load it into Hadoop and perform any necessary merges.

I wanted to see how easy it would be to use the same system to use that same flexible system and bend it to another database system, in my case, I chose Cassandra.

For the record, it took me a couple of hours to have this working, and I’m guessing another hour will file down some of the rough edges.

Cassandra is interesting as a database because it mixes a big distributed key/value store with a close-enough to SQL like interface in the form of CQL. And that means we can make use of the CQL to help us perform the merging into the final tables in a manner not dissimilar to the method we use for loading into Vertica.

Back to the Javascript batch loader, the applier provides five different implementable functions (all are technically optional) that you can use at different stages of the applier process. These are:

  • prepare() – called once when the applier goes online and can be used to create temporary directories or spaces
  • begin() – called at the start of each transaction
  • apply() – called at the end of the transaction once the data file has been written, but before the commit
  • commit() – called after each transaction commit has taken place; this where we can consolidate info.
  • release() – called when the applier goes offline

We can actually align these functions with a typical transaction – prepare() happens before the statements even start, begin() is the same as BEGIN, apply() happens immediately before COMMIT and commit() happens just after. release() can be used to do any clean up afterwards.

So let’s put this into practice and use it for Cassandra.

The basic process for loading is as follows:

  1. Write a CSV file to load into Cassandra
  2. Load the CSV file into a staging table within Cassandra; this is easy through CQL using the ‘COPY tablename FROM filename’ CQL statement.
  3. Merge the staging table data with a live table to create a carbon copy of our MySQL table content.

For the loading portion, what we’ll do is load the CSV into a staging table, and then we’ll merge the staging table and live table data together during the commit stage of our batch applier. We’ll return to this in more detail.

For the merging, we’ll take the information from the staging table, which includes the sequence number and operation type, and then write the ‘latest’ version of that row and put it into the live table. That gives us a structure like this:

Cassandra Loader

Tungsten Replicator is going to manage this entire process for us – all we need to do ins install the replicators, plug in these custom bits, and let it run.

As with the Hadoop applier, what we’re going to do is use the batch applier to generate only insert and delete rows; UPDATE statements will be converted into a delete of the original version and insert of the new version. So:

INSERT INTO sample VALUES (1,’Message’)

Is an insert…

DELETE sample WHERE id  = 1

Is a delete, and:

UPDATE sample SET message = ’Now you see me’ WHERE id = 1

is actually:

DELETE sample WHERE id  = 1
 INSERT INTO sample VALUES (1,’Now you see me’)

This gets round the problem of doing updates (which in big data stores are expensive, particularly Hadoop which doesn’t support updating existing data), into a more efficient delete and insert.

In the CSV data itself, this is represented by prefix every row with three fields:

optype, sequence number, unique id

Optype is ‘D’ for a delete and ‘I’ for an insert and is used to identify what needs to be done. The sequence number is the unique transaction ID from the replicator THL. This number increases by one for every transaction, and this means we can always identify the ‘latest’ version of a row, which is important to us when processing the transaction into Cassandra. the unique ID is the primary key (or compound key) from the source data. We need this to ensure we update the right row. To replicate data in this way, we must have a primary key on the data. If you don’t have primary keys, you are probably in a world of hurt anyway, so it shouldn’t be a stretch.

One difficulty here is that we need to cope with an idiosyncracy of Cassandra, which is that by default, Cassandra orders fields in the ‘tables’ (really collections of key/values) so that integers and numbers appear first in the table, and text appears last. This is an optimisation that Cassandra makes that complicates things for us, but only in a very small way. For the moment, we’ll handle it by assuming that we are loading only one table with a known format into Cassandra. We could handle multiple tables by using a simple IF statement in the JS and using different formats for that, or we could actually extract the info from the incoming data; I’m going to skip that because it keeps us away from the cool element of actually getting the data in.

Within Cassandra then we have two tables, the table we are loading data into, and the staging table that we load the CSV data into. For our sample, the live schema is ‘sample’, the live table is ‘sample’ and the staging table is ‘staging_sample’.

The definitions for these in Cassandra are for the sample live table:

 CREATE TABLE sample (
 id int,
 message text,
 PRIMARY KEY (id)
 ) WITH
 bloom_filter_fp_chance=0.010000 AND
 caching='KEYS_ONLY' AND
 comment='' AND
 dclocal_read_repair_chance=0.000000 AND
 gc_grace_seconds=864000 AND
 index_interval=128 AND
 read_repair_chance=0.100000 AND
 replicate_on_write='true' AND
 populate_io_cache_on_flush='false' AND
 default_time_to_live=0 AND
 speculative_retry='99.0PERCENTILE' AND
 memtable_flush_period_in_ms=0 AND
 compaction={'class': 'SizeTieredCompactionStrategy'} AND
 compression={'sstable_compression': 'LZ4Compressor'};

And for the staging_sample table:

CREATE TABLE staging_sample (
 optype text,
 seqno int,
 fragno int,
 id int,
 message text,
 PRIMARY KEY (optype, seqno, fragno, id)
 ) WITH
 bloom_filter_fp_chance=0.010000 AND
 caching='KEYS_ONLY' AND
 comment='' AND
 dclocal_read_repair_chance=0.000000 AND
 gc_grace_seconds=864000 AND
 index_interval=128 AND
 read_repair_chance=0.100000 AND
 replicate_on_write='true' AND
 populate_io_cache_on_flush='false' AND
 default_time_to_live=0 AND
 speculative_retry='99.0PERCENTILE' AND
 memtable_flush_period_in_ms=0 AND
 compaction={'class': 'SizeTieredCompactionStrategy'} AND
 compression={'sstable_compression': 'LZ4Compressor'};

I’ve put both tables into a ‘sample’ collection.

Remember that that idiosyncrasy I mentioned? Here it is, a bare table loading from CSV will actually order the data as:

seqno,uniqno,id,optype,message

This is Cassandra’s way of optimising integers over text to speed up lookups, but for us is a minor niggle. Right now, I’m going to handle it by assuming we are replicating only one schema/table and we we not what the structure of that looks like. Longer term, I want to pull it out of the metadata, but that’s a refinement.

So let’s start by having a look at the basic JS loader script, it’s really the component that is going to handle the core element of the work, managing the CSV files that come in from the batch engine and applying them into Cassandra. Remember, there are five functions that we can define, but for the purposes of this demonstration we’re going to use only two of them, apply(), which will load the CSV file into Cassandra, and the commit() function, which will perform the steps to merge the stage data.

The apply() function does two things, it identifies the table and schema, and then runs the command to load this data into Cassandra through the cqlsh command-line tool. We actually can’t run CQL directly from this command line, but I wrote a quick shell script that pipes CQL from the command-line into a running cqlsh.

The commit() function on the other hand is simpler, although it does a much more complicated job using another external script, this time written in Ruby.

So this gives us a cassandra.js script for the batch applier that looks like this:

function apply(csvinfo)
{
   sqlParams = csvinfo.getSqlParameters();
   csv_file = sqlParams.get("%%CSV_FILE%%");
   schema = csvinfo.schema;
   table = csvinfo.table;
  runtime.exec("/opt/continuent/share/applycqlsh.sh " + schema + ' "copy staging_' + table + " (optype,seqno,uniqno,id,message) from '" + csv_file + "';\"");
}

function commit()
{
  runtime.exec("/opt/continuent/share/merge.rb " + schema);
}

So, the apply() function is called for each event as written into the THL from the MySQL binary log, and the content of the CSV file generated at that point contains the contents of the THL event; if it’s one row, it’s a one-row CSV file; if it’s a statement or transaction that created 2000 rows, it’s a 2000 row CSV file.

The csvinfo object that is provided contains information about the batch file that is written, including, as you can see here, the schema and table names, and the sequence number. Note that we could, at this point, pull out table info, but we’re going to concentrate on pulling a single table here just for demo purposes.

The CQL for loading the CSV data is:

COPY staging_tablename (optype,seqno,uniqno,id,message) from ‘FILENAME’;

This says, copy the the specific columns in this order from the file into the specified table.  As I mentioned, currently this is hard coded into the applier JS, but would be easy to handle for more complex schemas and structures.

The commit() function is even simpler, because it just calls a script that will do the merging for us – we’ll get to that in a minute.

So here’s the script that applies an arbitrary CQL statement into Cassandra:

 #!/bin/bash
SCHEMA=$1;shift
echo "$*" |cqlsh -k $SCHEMA tr-cassandra2

Really simple, but gets round a simple issue.

The script that does the merge work is more complex; in other environments we might be able to do this all within SQL, but CQL is fairly limited with no sub-queries. So we do it long-hand using Ruby. The basic sequence is quite simple, and is in two phases:

  1. Delete every row mentioned in the staging table with an optype of D with a matching unique key
  2. Insert the *last* version of an insert for each unique ID – the last version will be the latest one in the output. We can pick this out by just iterating over every insert and picking the one with the highest Sequence number as generated by the THL transaction ID.
  3. Delete the content from the staging table because we’ve finished with it. That empties the staging table ready for the next set of transactions.

That file looks like this:

#!/usr/bin/ruby

require 'cql'

client = Cql::Client.connect(hosts: ['192.168.1.51'])
client.use('sample')

rows = client.execute("SELECT id FROM staging_sample where optype = 'D'")

deleteids = Array.new()

rows.each do |row|
puts "Found ID #{row['id']} has to be deleted"
deleteids.push(row['id'])
end

deleteidlist = deleteids.join(",")

client.execute("delete from sample where id in (#{deleteidlist})");
puts("delete from sample where id in (#{deleteidlist})");
rows = client.execute("SELECT * FROM staging_sample where optype = 'I'");

updateids = Hash.new()
updatedata = Hash.new()

rows.each do |row|
id = row['id']
puts "Found ID #{id} seq #{row['seqno']} has to be inserted"
if updateids[id]
if updateids[id] < row['seqno']
updateids[id] = row['seqno']
row.delete('seqno')
row.delete('fragno')
row.delete('optype')
updatedata[id] = row
end
else
updateids[id] = row['seqno']
row.delete('seqno')
row.delete('fragno')
row.delete('optype')
updatedata[id] = row
end
end

updatedata.each do |rowid,rowdata|
puts "Should update #{rowdata['id']} with #{rowdata['message']}"
collist = rowdata.keys.join(',')
colcount = rowdata.keys.length
substbase = Array.new()
#  (1..colcount).each {substbase.push('?')}
rowdata.values.each do |value|
if value.is_a?(String)
substbase.push("'" + value.to_s + "'")
else
substbase.push(value)
end
end

substlist = substbase.join(',')

puts('Column list: ',collist)
puts('Subst list: ',substlist)
cqlinsert = "insert into sample ("+collist+") values ("+substlist+")"
puts("Statement: " + cqlinsert)
client.execute(cqlinsert)
end

client.execute("delete from staging_sample where optype in ('D','I')")

Again, currently, this is hard coded, but I could easily of got the schema/table name from the JS batch applier – the actual code is table agnostic and will work with any table.

So, I’ve setup two replicators – one uses the cassandra.js rather than hadoop.js but works the same way, and copied the applycqlsh.sh and merge.rb into /opt/continuent/share.

And we’re ready to run. Let’s try it:

mysql> insert into sample values (0,'First Message’);
Query OK, 1 row affected (0.01 sec)

We’ve inserted one row. Let’s go check Cassandra:

cqlsh:sample> select * from sample;

id  | message
-----+---------------
489 | First Message

Woohoo – data from MySQL straight into Cassandra.

Now let’s try updating it:

mysql> update sample set message = 'Updated Message' where id = 489;
Query OK, 1 row affected (0.01 sec)
Rows matched: 1  Changed: 1  Warnings: 0

And in Cassandra:

cqlsh:sample> select * from sample;

id  | message
-----+-----------------
489 | Updated Message

Bigger woohoo. Not only am I loading data directly into Cassandra, but I can update it as well. Now I can have a stream of update and information within MySQL replicated over to Cassandra for whatever analysis or information that I need without any issues.

Cool huh? I certainly think so (OK, but I’m biased).

Now I haven’t tested it, but this should just as easily work from Oracle; I’ll be testing that and let you know.

Any other database destinations people would like to see for replicating into? If so, let me know and I’ll see what I can do.

Getting Data into Hadoop in real-time

Moving data between databases is hard. Without ever intending it, I seem to have spent a lifetime working on solutions for getting data into and out of databases, but more frequently between. In fact, my first job out of university was migrating data from BRS/Text, a free-text database (probably what we would call a NoSQL) into a more structured Oracle.

Today I spend some of my time working in Big Data, more often than not, migrating information from existing data stores into Big Data so that they can be analysed, something I covered in more detail here:

http://www.ibm.com/developerworks/library/bd-sqltohadoop1/index.html
http://www.ibm.com/developerworks/library/bd-sqltohadoop2/index.html
http://www.ibm.com/developerworks/library/bd-sqltohadoop3/

The problem with the current techniques, Sqoop included, is that they rely on a relatively manual, even basic, transfer process. Dump your data out, reload it back again into Hadoop.

Even with Sqoop, although it automates much of the process, it is not entirely reliable, especially if you want to do more than simple dump and load. Serial loading, or incrementally transferring data from MySQL or Oracle, is fraught with problems, not least of which is that it requires adding a timestamp to your data structure to get the best results out of it.

Perhaps worse though is that Sqoop is an intermittent, even periodic transfer system. Incremental loading works by copying all the changed records since a specific point in time. Running it too frequently is counter productive, which means you end up using a 15-minute or every-couple-of-hour period, depending on your database activity.

Most databases have some kind of stream of changes that enables you to see everything that has happened on the database. With MySQL, that’s the binary log. And with the Open Source Tungsten Replicator tool we take advantage of that so that we can replicate into MySQL, and indeed into Oracle, MongoDB and Vertica, among others.

51c6a7b5abcca6c30f7d79ea8eba17f0

Reading the data out from MySQL is lightweight since the master just reads the contents of the binary log; especially compared to Sqoop, which uses read locks and SELECT * with and without LIMIT clauses.

Right now we’re working on an applier that writes that data into Hadoop in real time from MySQL. Unlike Sqoop, we provide a continuous stream of changes from MySQL into the immutable store of Hadoop.

But the loading and nature of Hadoop presents some interesting issues, not least of which (if you’ve been following my other articles) is the fact that data written into Hadoop is immutable. For data that is constantly changing, an immutable store is not the obvious destination.

We get round that by using the batch loading system to create CSV files that contain the data, changes and sequence numbers, and then loading that information into Hadoop. In fact, Robert has updated the batch loader to use a new JavaScript based system (of which more in a future blog post) that simplifies the entire process, without requiring a direct connection or interface to Hadoop (although we can write directly into HDFS).

For example, the MySQL row:


| 3 | #1 Single | 2006 | Cats and Dogs (#1.4) |

Is represented within the staging files generated as:


I^A1318^A3^A3^A#1 Single^A2006^ACats and Dogs (#1.4)

That’s directly accessible by Hive. In fact, using our ddlscan tool, we can even create the Hive table definitions for you:


ddlscan -user tungsten -url 'jdbc:mysql://host1:13306/test' -pass password \
-template ddl-mysql-hive-0.10.vm -db test

Then we can use that record of changes to create a live version of the data, using a straightforward query within Hive. In fact, Hive provides the final crucial stage of the loading process by giving us that live view of the change data, and we simplify that element by providing the core data, and ensuring that the CSV data is in the right format for Hive to use the files without changes.

The process is quite remarkable; speed-wise for direct dumps, Tungsten Replicator is comparable to Sqoop, but when it comes to change data, the difference is that we have the information in real time. You don’t have to wait for the next Sqoop load, or for the incremental loading and row selection of Sqoop, instead, we just apply the changes written into the binary log.

Of course, we can fine tune the intervals of the writes of the CSV change data into Hadoop using the block commit properties (see http://docs.continuent.com/tungsten-replicator-2.2/performance-block.html). For example, this means by default we commit into Hadoop every 10s or 10,000 rows, but we can change it to commit every 5s or 1,000 rows if your data is critical and busy.

We’re still optimising and improving the system, but I can tell you that in my own tests we can handle GB of change data and information in a live fashion, both across single-table and multi-table/multi-schema datasets. What’s particularly cool is that if you are using Hadoop as a concentrator for all of your MySQL data for analysis, we can transfer from multiple MySQL servers into Hadoop simultaneously and take advantage of the multi-node Hadoop environment to cope with the load.