Hadoop BoF Session at OSCON

I have a BoF session next week at OSCON next week:

Migrating Data from MySQL and Oracle into Hadoop

The session is at 7pm Tuesday night – look for rooms D135 and/or D137/138.

Correction: We are now in  E144 on Tuesday with the Hadoop get together first at 7pm, and the Data Migration to follow at 8pm.

I’m actually going to be joined by Gwen Shapira from Cloudera, who has a BoF session on Hadoop next door at the same time, along with Eric Herman from Booking.com. We’ll use the opportunity to talk all things Hadoop, but particularly the ingestion of data from MySQL and other databases into the Hadoop datastore.

As always, it’d be great to meet anybody interested in Hadoop at the BoF, please come along and introduce yourselves, and hopefully I’ll see you next week!

Continuent at Hadoop Summit

I’m pleased to say that Continuent will be at the Hadoop Summit in San Jose next week (3-5 June). Sadly I will not be attending as I’m taking an exam next week, but my colleagues Robert Hodges, Eero Teerikorpi and Petri Versunen will be there to answer any questions you have about Continuent products, and, of course, Hadoop replication support built into Tungsten Replicator 3.0.

If you are at the conference, please go along and say hi to the team. And, as always, if there are any questions please let them or me know.

Real-Time Data Movement: The Key to Enabling Live Analytics With Hadoop

An article about moving data into Hadoop in real-time has just been published over at DBTA, written by me and my CEO Robert Hodges.

In the article I talk about one of the major issues for all people deploying databases in the modern heterogenous world – how do we move and migrate data effectively between entirely different database systems in a way that is efficient and usable. How do you get the data you need to the database you need it in. If your source is a transactional database, how does that data get moved into Hadoop in a way that makes the data usable to be queried by Hive, Impala or HBase?

You can read the full article here: Real-Time Data Movement: The Key to Enabling Live Analytics With Hadoop

 

Cross your Fingers for Tech14, see you at OSCON

i

So I’ve submitted my talks for the Tech14 UK Oracle User Group conference which is in Liverpool this year. I’m not going to give away the topics, but you can imagine they are going to be about data translation and movement and how to get your various databases talking together.

I can also say, after having seen other submissions for talks this year (as I’m helping to judge), that the conference is shaping up to be very interesting. There’s a good spread of different topics this year, but I know from having talked to the organisers that they are looking for more submissions in the areas of Operating Systems, Engineered Systems and Development (mobile and cloud).

If you’ve got a paper, presentation, or idea for one that you think would be useful, please go ahead and submit your idea.

I’m also pleased to say that I’ll be at OSCON in Oregon in July, handling a Birds of a Feather (BOF) session on the topic of exchanging data between MySQL, Oracle and Hadoop. I’ll be there with my good friend Eric Herman from Booking.com where we’ll be providing advice, guidance, experiences, and hoping to exchange more ideas, wishes and requirements for heterogeneous environments.

It’d be great to meet you if you want to come along to either conference.

 

 

Harvest machine data using Hadoop and Hive

A new article on has been published on IBM developerWorks, looking at the basics of processing machine data using Hadoop, from extracting the core data, storing it, and then determining the baselines and trigger points required to identifying worrying trends and points. From the intro:

Machine data can come in many different formats and quantities. Weather sensors, fitness trackers, and even air-conditioning units produce massive amounts of data, which begs for a big data solution. But how do you decide what data is important, and how do you determine what proportion of that information is valid, worth including in reports, or valuable in detecting alert situations? This article covers some of the challenges and solutions for supporting the consumption of massive machine data sets that use big data technology and Hadoop.

Harvest machine data using Hadoop and Hive.

Tungsten Replicator 3.0 is Cloudera Enterprise 5 Certified

One of the key platforms I’ve been testing on for the MySQL to Hadoop replication has been Cloudera, largely driven by customer requirements, but it’s also one of the easiest way to get started with Hadoop.

logo_cloudera_certified

What I’m even more pleased about is the fact that we are proud to announce that Tungsten Replicator 3.0 is certified for use on the new Cloudera Enterprise 5 platform. That means that we’re sure that replicating your data from MySQL to Cloudera 5 and have it work without causing problems or difficulties on the Hadoop loading and materialisation.

Cloudera is a great product, and we’re very happy to be working so effectively with the new Cloudera Enterprise 5. Cloudera certainly makes the core operation of managing and monitoring your Hadoop cluster so much easier, while still providing core functionality from the Hadoop family like Hive, HBase and Impala.

What I’m really interested in is the support for Spark, which will allow much easier live-querying and access to data.  That should make some data processing and live data views much easier to build and query further down the line.

Continuent Replication to Hadoop – Now in Stereo!

Hopefully by now you have already seen that we are working on Hadoop replication. I’m happy to say that it is going really well. I’ve managed to push a few terabytes of data and different data sets through into Hadoop on Cloudera, HortonWorks, and Amazon’s Elastic MapReduce (EMR). For those who have been following my long association with the IBM InfoSphere BigInsights Hadoop product, and I’m pleased to say that it’s working there too. I’ve had to adapt Robert’s original script to work with the different versions of the underlying Hadoop tools and systems to make it compatible. The actual performance and process is unchanged; you just use a different JS-based batchloader script to work with different tools.

Robert has also been simplifying some of the core functionality, such as configuring some fixed pre-determined formats, so you no longer have to explicitly set the field and record separators.

I’ve also been testing the key feature of being able to integrate the provisiong of information using Sqoop and merging that original Sqooped data into Hadoop, and then following up with the change data that the replicator is effectively transferring over. The system works exactly as I’ve just described – start the replicator, Sqoop the data, materialise the view within Hadoop. It’s that easy; in fact, if you want a deeper demonstration of all of these features, we’ve got a video from my recent webinar session:

Real Time Data Loading from MySQL to Hadoop with New Tungsten Replicator 3.0

If you can’t spare the time, but still want to know about our Hadoop applier, try our short 5-minute video:

Real-time data loading into Hadoop with Tungsten Replicator

While you’re there, check out the Clustering video I did at the same time:

Continuent Tungsten Clustering

And of course, don’t forget that you can see the product and demos live by attending Percona Live in Santa Clara this week (1st-4th April).

Real-Time Data Loading from MySQL to Hadoop using Tungsten Replicator 3.0 Webinar

To follow-up and describe some of the methods and techniques behind replicating into Hadoop from MySQL in real-time, and how this can be combined into your data workflow, Continuent are running a webinar with me presenting that will go over the details and provide a demo of the data replication process.

Real-Time Data Loading from MySQL to Hadoop with New Tungsten Replicator 3.0

Hadoop is an increasingly popular means of analyzing transaction data from MySQL. Up until now mechanisms for moving data between MySQL and Hadoop have been rather limited. The new Continuent Tungsten Replicator 3.0 provides enterprise-quality replication from MySQL to Hadoop. Tungsten Replicator 3.0 is 100% open source, released under a GPL V2 license, and available for download at https://code.google.com/p/tungsten-replicator/. Continuent Tungsten handles MySQL transaction types including INSERT/UPDATE/DELETE operations and can materialize binlogs as well as mirror-image data copies in Hadoop. Continuent Tungsten also has the high performance necessary to load data from busy source MySQL systems into Hadoop clusters with minimal load on source systems as well as Hadoop itself.

This webinar covers the following topics:

– How Hadoop works and why it’s useful for processing transaction data from MySQL
– Setting up Continuent Tungsten replication from MySQL to Hadoop
– Transforming MySQL data within Hadoop to enable efficient analytics
– Tuning replication to maximize performance.

You do not need to be an expert in Hadoop or MySQL to benefit from this webinar. By the end listeners will have enough background knowledge to start setting up replication between MySQL and Hadoop using Continuent Tungsten.

You can join the webinar on 27th March (Thursday), 10am PDT, 1pm EDT, or 5pm GMT by registering here: https://www1.gotomeeting.com/register/225780945

 

 

Real-Time Replication from MySQL to Cassandra

Earlier this month I blogged about our new Hadoop applier, I published the docs for that this week (http://docs.continuent.com/tungsten-replicator-3.0/deployment-hadoop.html) as part of the Tungsten Replicator 3.0 documentation (http://docs.continuent.com/tungsten-replicator-3.0/index.html). It contains some additional interesting nuggets that will appear in future blog posts.

The main part of that functionality that performs the actual applier for Hadoop is based around a JavaScript applier engine – there will eventually be docs for that as part of the Batch Applier content (http://docs.continuent.com/tungsten-replicator-3.0/deployment-batchloading.html). The core of this system is that it    takes the information from the data stream of the THL and the CSV file that was written by the batch applier system, and runs the commands necessary to load it into Hadoop and perform any necessary merges.

I wanted to see how easy it would be to use the same system to use that same flexible system and bend it to another database system, in my case, I chose Cassandra.

For the record, it took me a couple of hours to have this working, and I’m guessing another hour will file down some of the rough edges.

Cassandra is interesting as a database because it mixes a big distributed key/value store with a close-enough to SQL like interface in the form of CQL. And that means we can make use of the CQL to help us perform the merging into the final tables in a manner not dissimilar to the method we use for loading into Vertica.

Back to the Javascript batch loader, the applier provides five different implementable functions (all are technically optional) that you can use at different stages of the applier process. These are:

  • prepare() – called once when the applier goes online and can be used to create temporary directories or spaces
  • begin() – called at the start of each transaction
  • apply() – called at the end of the transaction once the data file has been written, but before the commit
  • commit() – called after each transaction commit has taken place; this where we can consolidate info.
  • release() – called when the applier goes offline

We can actually align these functions with a typical transaction – prepare() happens before the statements even start, begin() is the same as BEGIN, apply() happens immediately before COMMIT and commit() happens just after. release() can be used to do any clean up afterwards.

So let’s put this into practice and use it for Cassandra.

The basic process for loading is as follows:

  1. Write a CSV file to load into Cassandra
  2. Load the CSV file into a staging table within Cassandra; this is easy through CQL using the ‘COPY tablename FROM filename’ CQL statement.
  3. Merge the staging table data with a live table to create a carbon copy of our MySQL table content.

For the loading portion, what we’ll do is load the CSV into a staging table, and then we’ll merge the staging table and live table data together during the commit stage of our batch applier. We’ll return to this in more detail.

For the merging, we’ll take the information from the staging table, which includes the sequence number and operation type, and then write the ‘latest’ version of that row and put it into the live table. That gives us a structure like this:

Cassandra Loader

Tungsten Replicator is going to manage this entire process for us – all we need to do ins install the replicators, plug in these custom bits, and let it run.

As with the Hadoop applier, what we’re going to do is use the batch applier to generate only insert and delete rows; UPDATE statements will be converted into a delete of the original version and insert of the new version. So:

INSERT INTO sample VALUES (1,’Message’)

Is an insert…

DELETE sample WHERE id  = 1

Is a delete, and:

UPDATE sample SET message = ’Now you see me’ WHERE id = 1

is actually:

DELETE sample WHERE id  = 1
 INSERT INTO sample VALUES (1,’Now you see me’)

This gets round the problem of doing updates (which in big data stores are expensive, particularly Hadoop which doesn’t support updating existing data), into a more efficient delete and insert.

In the CSV data itself, this is represented by prefix every row with three fields:

optype, sequence number, unique id

Optype is ‘D’ for a delete and ‘I’ for an insert and is used to identify what needs to be done. The sequence number is the unique transaction ID from the replicator THL. This number increases by one for every transaction, and this means we can always identify the ‘latest’ version of a row, which is important to us when processing the transaction into Cassandra. the unique ID is the primary key (or compound key) from the source data. We need this to ensure we update the right row. To replicate data in this way, we must have a primary key on the data. If you don’t have primary keys, you are probably in a world of hurt anyway, so it shouldn’t be a stretch.

One difficulty here is that we need to cope with an idiosyncracy of Cassandra, which is that by default, Cassandra orders fields in the ‘tables’ (really collections of key/values) so that integers and numbers appear first in the table, and text appears last. This is an optimisation that Cassandra makes that complicates things for us, but only in a very small way. For the moment, we’ll handle it by assuming that we are loading only one table with a known format into Cassandra. We could handle multiple tables by using a simple IF statement in the JS and using different formats for that, or we could actually extract the info from the incoming data; I’m going to skip that because it keeps us away from the cool element of actually getting the data in.

Within Cassandra then we have two tables, the table we are loading data into, and the staging table that we load the CSV data into. For our sample, the live schema is ‘sample’, the live table is ‘sample’ and the staging table is ‘staging_sample’.

The definitions for these in Cassandra are for the sample live table:

 CREATE TABLE sample (
 id int,
 message text,
 PRIMARY KEY (id)
 ) WITH
 bloom_filter_fp_chance=0.010000 AND
 caching='KEYS_ONLY' AND
 comment='' AND
 dclocal_read_repair_chance=0.000000 AND
 gc_grace_seconds=864000 AND
 index_interval=128 AND
 read_repair_chance=0.100000 AND
 replicate_on_write='true' AND
 populate_io_cache_on_flush='false' AND
 default_time_to_live=0 AND
 speculative_retry='99.0PERCENTILE' AND
 memtable_flush_period_in_ms=0 AND
 compaction={'class': 'SizeTieredCompactionStrategy'} AND
 compression={'sstable_compression': 'LZ4Compressor'};

And for the staging_sample table:

CREATE TABLE staging_sample (
 optype text,
 seqno int,
 fragno int,
 id int,
 message text,
 PRIMARY KEY (optype, seqno, fragno, id)
 ) WITH
 bloom_filter_fp_chance=0.010000 AND
 caching='KEYS_ONLY' AND
 comment='' AND
 dclocal_read_repair_chance=0.000000 AND
 gc_grace_seconds=864000 AND
 index_interval=128 AND
 read_repair_chance=0.100000 AND
 replicate_on_write='true' AND
 populate_io_cache_on_flush='false' AND
 default_time_to_live=0 AND
 speculative_retry='99.0PERCENTILE' AND
 memtable_flush_period_in_ms=0 AND
 compaction={'class': 'SizeTieredCompactionStrategy'} AND
 compression={'sstable_compression': 'LZ4Compressor'};

I’ve put both tables into a ‘sample’ collection.

Remember that that idiosyncrasy I mentioned? Here it is, a bare table loading from CSV will actually order the data as:

seqno,uniqno,id,optype,message

This is Cassandra’s way of optimising integers over text to speed up lookups, but for us is a minor niggle. Right now, I’m going to handle it by assuming we are replicating only one schema/table and we we not what the structure of that looks like. Longer term, I want to pull it out of the metadata, but that’s a refinement.

So let’s start by having a look at the basic JS loader script, it’s really the component that is going to handle the core element of the work, managing the CSV files that come in from the batch engine and applying them into Cassandra. Remember, there are five functions that we can define, but for the purposes of this demonstration we’re going to use only two of them, apply(), which will load the CSV file into Cassandra, and the commit() function, which will perform the steps to merge the stage data.

The apply() function does two things, it identifies the table and schema, and then runs the command to load this data into Cassandra through the cqlsh command-line tool. We actually can’t run CQL directly from this command line, but I wrote a quick shell script that pipes CQL from the command-line into a running cqlsh.

The commit() function on the other hand is simpler, although it does a much more complicated job using another external script, this time written in Ruby.

So this gives us a cassandra.js script for the batch applier that looks like this:

function apply(csvinfo)
{
   sqlParams = csvinfo.getSqlParameters();
   csv_file = sqlParams.get("%%CSV_FILE%%");
   schema = csvinfo.schema;
   table = csvinfo.table;
  runtime.exec("/opt/continuent/share/applycqlsh.sh " + schema + ' "copy staging_' + table + " (optype,seqno,uniqno,id,message) from '" + csv_file + "';\"");
}

function commit()
{
  runtime.exec("/opt/continuent/share/merge.rb " + schema);
}

So, the apply() function is called for each event as written into the THL from the MySQL binary log, and the content of the CSV file generated at that point contains the contents of the THL event; if it’s one row, it’s a one-row CSV file; if it’s a statement or transaction that created 2000 rows, it’s a 2000 row CSV file.

The csvinfo object that is provided contains information about the batch file that is written, including, as you can see here, the schema and table names, and the sequence number. Note that we could, at this point, pull out table info, but we’re going to concentrate on pulling a single table here just for demo purposes.

The CQL for loading the CSV data is:

COPY staging_tablename (optype,seqno,uniqno,id,message) from ‘FILENAME’;

This says, copy the the specific columns in this order from the file into the specified table.  As I mentioned, currently this is hard coded into the applier JS, but would be easy to handle for more complex schemas and structures.

The commit() function is even simpler, because it just calls a script that will do the merging for us – we’ll get to that in a minute.

So here’s the script that applies an arbitrary CQL statement into Cassandra:

 #!/bin/bash
SCHEMA=$1;shift
echo "$*" |cqlsh -k $SCHEMA tr-cassandra2

Really simple, but gets round a simple issue.

The script that does the merge work is more complex; in other environments we might be able to do this all within SQL, but CQL is fairly limited with no sub-queries. So we do it long-hand using Ruby. The basic sequence is quite simple, and is in two phases:

  1. Delete every row mentioned in the staging table with an optype of D with a matching unique key
  2. Insert the *last* version of an insert for each unique ID – the last version will be the latest one in the output. We can pick this out by just iterating over every insert and picking the one with the highest Sequence number as generated by the THL transaction ID.
  3. Delete the content from the staging table because we’ve finished with it. That empties the staging table ready for the next set of transactions.

That file looks like this:

#!/usr/bin/ruby

require 'cql'

client = Cql::Client.connect(hosts: ['192.168.1.51'])
client.use('sample')

rows = client.execute("SELECT id FROM staging_sample where optype = 'D'")

deleteids = Array.new()

rows.each do |row|
puts "Found ID #{row['id']} has to be deleted"
deleteids.push(row['id'])
end

deleteidlist = deleteids.join(",")

client.execute("delete from sample where id in (#{deleteidlist})");
puts("delete from sample where id in (#{deleteidlist})");
rows = client.execute("SELECT * FROM staging_sample where optype = 'I'");

updateids = Hash.new()
updatedata = Hash.new()

rows.each do |row|
id = row['id']
puts "Found ID #{id} seq #{row['seqno']} has to be inserted"
if updateids[id]
if updateids[id] < row['seqno']
updateids[id] = row['seqno']
row.delete('seqno')
row.delete('fragno')
row.delete('optype')
updatedata[id] = row
end
else
updateids[id] = row['seqno']
row.delete('seqno')
row.delete('fragno')
row.delete('optype')
updatedata[id] = row
end
end

updatedata.each do |rowid,rowdata|
puts "Should update #{rowdata['id']} with #{rowdata['message']}"
collist = rowdata.keys.join(',')
colcount = rowdata.keys.length
substbase = Array.new()
#  (1..colcount).each {substbase.push('?')}
rowdata.values.each do |value|
if value.is_a?(String)
substbase.push("'" + value.to_s + "'")
else
substbase.push(value)
end
end

substlist = substbase.join(',')

puts('Column list: ',collist)
puts('Subst list: ',substlist)
cqlinsert = "insert into sample ("+collist+") values ("+substlist+")"
puts("Statement: " + cqlinsert)
client.execute(cqlinsert)
end

client.execute("delete from staging_sample where optype in ('D','I')")

Again, currently, this is hard coded, but I could easily of got the schema/table name from the JS batch applier – the actual code is table agnostic and will work with any table.

So, I’ve setup two replicators – one uses the cassandra.js rather than hadoop.js but works the same way, and copied the applycqlsh.sh and merge.rb into /opt/continuent/share.

And we’re ready to run. Let’s try it:

mysql> insert into sample values (0,'First Message’);
Query OK, 1 row affected (0.01 sec)

We’ve inserted one row. Let’s go check Cassandra:

cqlsh:sample> select * from sample;

id  | message
-----+---------------
489 | First Message

Woohoo – data from MySQL straight into Cassandra.

Now let’s try updating it:

mysql> update sample set message = 'Updated Message' where id = 489;
Query OK, 1 row affected (0.01 sec)
Rows matched: 1  Changed: 1  Warnings: 0

And in Cassandra:

cqlsh:sample> select * from sample;

id  | message
-----+-----------------
489 | Updated Message

Bigger woohoo. Not only am I loading data directly into Cassandra, but I can update it as well. Now I can have a stream of update and information within MySQL replicated over to Cassandra for whatever analysis or information that I need without any issues.

Cool huh? I certainly think so (OK, but I’m biased).

Now I haven’t tested it, but this should just as easily work from Oracle; I’ll be testing that and let you know.

Any other database destinations people would like to see for replicating into? If so, let me know and I’ll see what I can do.