Replicating Oracle Webinar Question Follow-up

We had really great webinar on Replicating to/from Oracle earliest this month, and you can view the recording of that Webinar here.

A good sign of how great a Webinar was is the questions that come afterwards, and we didn’t get through them all. so here are all the questions and answers for the entire webinar.

Q: What is the overhead of Replicator on source database with asynchronous CDC?

A: With asynchronous operation there is no substantial CPU overhead (as with synchronous), but the amount of generated redo logs becomes bigger requiring more disk space and better log management to ensure that the space is used effectively.

Q: Do you support migration from Solaris/Oracle to Linux/Oracle?

A: The replication is not certified for use on Solaris, however, it is possible to configure a replicator to operate remotely and extract from a remote Oracle instance. This is achieved by installing Tungsten Replicator on Linux and then extracting from the remote Oracle instance.

Q: Are there issues in supporting tables without Primary Keys on Oracle to Oracle replication?

A: Non-primary key tables will work, but it is not recommended for production as it implies significant overhead when applying to a target database.

Q: On Oracle->Oracle replication, if there are triggers on source tables, how is this handled?

A: Tungsten Replicator does not automatically disable triggers. The best solution is to remove triggers on slaves, or rewrite triggers to identify whether a trigger is being executed on the master or slave and skip it accordingly, although this requires rewriting the triggers in question.

Q: How is your offering different/better than Oracle Streams replication?

A: We like to think of ourselves as GoldenGate without the price tag. The main difference is the way we extract the information from Oracle, otherwise, the products offer similar functionality. For Tungsten Replicator in particular, one advantage is the open and flexible nature, since Tungsten Replicator is open source, released under a GPL V2 license, and available at https://code.google.com/p/tungsten-replicator/.

Q: How is the integrity of the replica maintained/verified?

A: Replicator has built-in real-time consistency checks: if an UPDATE or DELETE doesn’t update any rows, Replicator will go OFFLINE:ERROR, as this indicates an inconsistent dataset.

Q: Can configuration file based passwords be specified using some form of encrypted value for security purposes to keep them out of the clear?

A: We support an INI file format so that you do not have to use the command-line installation process. There is currently no supported option for an encrypted version of these values, but the INI file can be secured so it is only readable by the Tungsten user.

Q: Our source DB is Oracle RAC with ~10 instances. Is coherency maintained in the replication from activity in the various instances?

A: We do not monitor the information that has been replicated; but CDC replicates row-based data, not statements, so typical sequence insertion issues that might occur with statement based replication should not apply.

Q: Is there any maintenance of Oracle sequence values between Oracle and replicas?

A: Sequence values are recorded into the row data as extracted by Tungsten Replicator. Because the inserted values, not the sequence itself, is replicated, there is no need to maintain sequences between hosts.

Q: How timely is the replication? Particularly for hot source tables receiving millions of rows per day?

A: CDC is based on extracting the data at an interval, but the interval can be configured. In practice, assuming there are regular inserts and updates on the Oracle side, the data is replicated in real-time. See https://docs.continuent.com/tungsten-replicator-3.0/deployment-oracle-cdctuning.html for more information on how this figure can be tuned.

Q: Can parallel extractor instances be spread across servers rather than through threads on the same server (which would be constrained by network or HBA)?

A: Yes. We can install multiple replicators and tune the extraction of the parallel extractor accordingly. However, that selection would need to be manual, but certainly that is possible.

Q: Do you need the CSV file (to select individual tables with the setupCDC.sh configuration) on the master setup if you want all tables?

A: No.

Q: If you lose your slave down the road, do you need to re-provision from the initial SCN number or is there a way to start from a later point?

A: This is the reason for the THL Sequence Number introduced in the extractor. If you lose your slave, you can install a new slave and have it start at the transaction number where the failed slave stopped if you know it, since the information will be in the THL. If not, you can usually determine this by examining the THL directly. There should be no need to re-provision – just to restart from the transaction in the THL on the master.

Q: Regarding a failed slave, what if it failed such that we don’t have a backup or wanted to provision a second slave such that it had no initial data.

A: If you had no backups or data, yes, you would need to re-provision with the parallel extractor in order to seed the target database.

Q: Would you do that with the original SCN? If it had been a month or two, is there a way to start at a more recent SCN (e.g. you have to re-run the setupCDC process)?

A: The best case is to have two MySQL slaves and when one fails, you re-provision it from the healthy one. This avoids setupCDC stage.

However, the replication can always be started from a specific event (SCN) provided that SCN is available in the Oracle undo log space.

Q: How does Tungsten handle Oracle’s CLOB and BLOB data types

A: Providing you are using asynchronous CDC these types are supported; for synchronous CDC these types are not supported by Oracle.

Q: Can different schemas in Oracle be replicated at different times?

A: Each schema is extracted by a separate service in Replicator, so they are independent.

Q: What is the size limit for BLOB or CLOB column data types?

A: This depends on the CDC capabilities in Oracle, and is not limited within Tungsten Replicator. You may want to refer to the Oracle Docs for more information on CDC: http://docs.oracle.com/cd/B28359_01/server.111/b28313/cdc.htm

Q: With different versions of Oracle e.g. enterprise edition and standard edition one be considered heterogeneous environments?

A: Essentially yes, although the nomenclature is really only a categorization, it does not affect the operation, deployment or functionality of the replicator. All these features are part of the open source product.

Q: Can a 10g database (master) send the data to a 11g database (slave) for use in an upgrade?

A: Yes.

Q: Does the Oracle replicator require the Oracle database to be in archive mode?

A: Yes. This is a requirement for Oracle’s CDC implementation.

Q: How will be able to revisit this recorded webinar?

A: Slides and a recording from today’s webinar will be available at http://www.slideshare.net/Continuent_Tungsten

 

Tungsten Replicator 3.0 is Cloudera Enterprise 5 Certified

One of the key platforms I’ve been testing on for the MySQL to Hadoop replication has been Cloudera, largely driven by customer requirements, but it’s also one of the easiest way to get started with Hadoop.

logo_cloudera_certified

What I’m even more pleased about is the fact that we are proud to announce that Tungsten Replicator 3.0 is certified for use on the new Cloudera Enterprise 5 platform. That means that we’re sure that replicating your data from MySQL to Cloudera 5 and have it work without causing problems or difficulties on the Hadoop loading and materialisation.

Cloudera is a great product, and we’re very happy to be working so effectively with the new Cloudera Enterprise 5. Cloudera certainly makes the core operation of managing and monitoring your Hadoop cluster so much easier, while still providing core functionality from the Hadoop family like Hive, HBase and Impala.

What I’m really interested in is the support for Spark, which will allow much easier live-querying and access to data.  That should make some data processing and live data views much easier to build and query further down the line.

Continuent Replication to Hadoop – Now in Stereo!

Hopefully by now you have already seen that we are working on Hadoop replication. I’m happy to say that it is going really well. I’ve managed to push a few terabytes of data and different data sets through into Hadoop on Cloudera, HortonWorks, and Amazon’s Elastic MapReduce (EMR). For those who have been following my long association with the IBM InfoSphere BigInsights Hadoop product, and I’m pleased to say that it’s working there too. I’ve had to adapt Robert’s original script to work with the different versions of the underlying Hadoop tools and systems to make it compatible. The actual performance and process is unchanged; you just use a different JS-based batchloader script to work with different tools.

Robert has also been simplifying some of the core functionality, such as configuring some fixed pre-determined formats, so you no longer have to explicitly set the field and record separators.

I’ve also been testing the key feature of being able to integrate the provisiong of information using Sqoop and merging that original Sqooped data into Hadoop, and then following up with the change data that the replicator is effectively transferring over. The system works exactly as I’ve just described – start the replicator, Sqoop the data, materialise the view within Hadoop. It’s that easy; in fact, if you want a deeper demonstration of all of these features, we’ve got a video from my recent webinar session:

Real Time Data Loading from MySQL to Hadoop with New Tungsten Replicator 3.0

If you can’t spare the time, but still want to know about our Hadoop applier, try our short 5-minute video:

Real-time data loading into Hadoop with Tungsten Replicator

While you’re there, check out the Clustering video I did at the same time:

Continuent Tungsten Clustering

And of course, don’t forget that you can see the product and demos live by attending Percona Live in Santa Clara this week (1st-4th April).

Real-Time Data Loading from MySQL to Hadoop using Tungsten Replicator 3.0 Webinar

To follow-up and describe some of the methods and techniques behind replicating into Hadoop from MySQL in real-time, and how this can be combined into your data workflow, Continuent are running a webinar with me presenting that will go over the details and provide a demo of the data replication process.

Real-Time Data Loading from MySQL to Hadoop with New Tungsten Replicator 3.0

Hadoop is an increasingly popular means of analyzing transaction data from MySQL. Up until now mechanisms for moving data between MySQL and Hadoop have been rather limited. The new Continuent Tungsten Replicator 3.0 provides enterprise-quality replication from MySQL to Hadoop. Tungsten Replicator 3.0 is 100% open source, released under a GPL V2 license, and available for download at https://code.google.com/p/tungsten-replicator/. Continuent Tungsten handles MySQL transaction types including INSERT/UPDATE/DELETE operations and can materialize binlogs as well as mirror-image data copies in Hadoop. Continuent Tungsten also has the high performance necessary to load data from busy source MySQL systems into Hadoop clusters with minimal load on source systems as well as Hadoop itself.

This webinar covers the following topics:

– How Hadoop works and why it’s useful for processing transaction data from MySQL
– Setting up Continuent Tungsten replication from MySQL to Hadoop
– Transforming MySQL data within Hadoop to enable efficient analytics
– Tuning replication to maximize performance.

You do not need to be an expert in Hadoop or MySQL to benefit from this webinar. By the end listeners will have enough background knowledge to start setting up replication between MySQL and Hadoop using Continuent Tungsten.

You can join the webinar on 27th March (Thursday), 10am PDT, 1pm EDT, or 5pm GMT by registering here: https://www1.gotomeeting.com/register/225780945

 

 

MySQL to Hadoop Step-By-Step

We had a great webinar on Thursday about replicating from MySQL to Hadoop (watch the whole thing). It was great, but one of the questions at the end was ‘is there an easy way to test’.

Sadly we can’t go giving out convenient ready-to-run downloads of these things because of licensing and and other complexities, so I want to try and make it as simple and straightforward as possible by giving you the directions to complete. I’m going to be point to the Continuent Documentation every now and then so this is not too crowded, but we should get through it pretty easily.

Major Decisions

For this to work: 

  • We’ll setup two VMs, one the master (running MySQL), the other the slave (Running Cloudera)
  • The two VMs must be able to reach each other on the network. It doesn’t matter whether they are running Internal, NAT, or bridge-mode network, they just need to be able to ping and SSH each other. Switch off firewalls to prevent port weirdness.
  • For convenience, update your /etc/hosts to have a host1 (the master) and host2 (the slave)
  • The master must have followed the prereqs; for the slave it’s optional, but highly recommended

With that in mind, let’s get started.

Step 1: Setup your Master Host

There are a number of ways you can do this. If you want to simplify things and have VirtualBox, try downloading this VM. It’s a 1.5GB download containing and OVF VM, and is a Ubuntu host, with our prerequisites followed. To use this :

  1. Uncompress the package.
  2. Import the VM into your VirtualBox.
  3. If you want, change the network type from Internal to a bridged or NAT environment.

Using internal networking, you can login to this using:

shell> ssh -p2222 tungsten@localhost

Passwords are ‘password’ for tungsten and root.

If you don’t want to follow this but want your own VM:

  1. Create a new VM with 1-2GB of RAM, and 8GB or more of disk space
  2. Install your OS of choice, either Ubuntu or CentOS
  3. Follow our prerequisite instructions
  4. Make sure MySQL is setup and running, and that the binary logging is enabled
  5. Make sure it has a valid IP address

Step 2: Setup Tungsten Replicator

Download the latest Tungsten Replicator binary from this page

Unpack the file:

shell> tar zxf tungsten-replicator-3.0.tar.gz

Change into the directory:

shell> cd tungsten-replicator-3.0

Create a new replicator installation, this will read from the binary log into THL:

shell> ./tools/tpm install alpha \
--install-directory=/opt/continuent \
--master=host1 \
--members=host1 \
--java-file-encoding=UTF8 \
--java-user-timezone=GMT \
--mysql-enable-enumtostring=true \
--mysql-enable-settostring=true \
--mysql-use-bytes-for-string=false \
--svc-extractor-filters=colnames,pkey \
--property=replicator.filter.pkey.addColumnsToDeletes=true \
--property=replicator.filter.pkey.addPkeyToInserts=true \
--replication-password=password \
--replication-user=tungsten \
--skip-validation-check=HostsFileCheck \
--skip-validation-check=ReplicationServicePipelines \
--start-and-report=true

For a full description of what’s going on here, see this page and click on the magnifying glass. You’ll get the full description of each option.

To make sure everything is OK, you should get a status from trepctl generated. If it’s running and it shows the status as online, we’re ready.

Step 3: Get your Cloudera Host Ready

There are lots of ways to get Cloudera’s Hadoop solution installed. The ready-to-run VM is the simplest by far.

  1. Download the Cloudera VM quick start host from here; there are versions for VirtualBox and VMware and KVM.
  2. Set the networking type to match the master.
  3. Start the host
  4. Set the hostname to host2
  5. Update the networking to an IP address that can talk to the master.
  6. Update /etc/hosts to add the IP address of host1 and host2 e.g.:

192.168.0.2 host1

Add a ‘tungsten’ user which we will use to install Tungsten Replicator.

Step 4: Install your Hadoop Slave

Download the latest Tungsten Replicator binary from this page

Unpack the file:

shell> tar zxf tungsten-replicator-3.0.tar.gz

Change into the directory:

shell> cd tungsten-replicator-3.0

Create a new replicator installation, this will read the information from the master (host1) and apply it to this host (host2)

shell> ./tools/tpm install alpha \
--batch-enabled=true \
--batch-load-language=js \
--batch-load-template=hadoop \
--datasource-type=file \
--install-directory=/opt/continuent \
--java-file-encoding=UTF8 \
--java-user-timezone=GMT \
--master=host1 \
--members=host2 \
'--property=replicator.datasource.applier.csv.fieldSeparator=\\u0001' \
--property=replicator.datasource.applier.csv.useQuotes=false \
--property=replicator.stage.q-to-dbms.blockCommitInterval=1s \
--property=replicator.stage.q-to-dbms.blockCommitRowCount=1000 \
--replication-password=secret \
--replication-user=tungsten \
--skip-validation-check=DatasourceDBPort \
--skip-validation-check=DirectDatasourceDBPort \
--skip-validation-check=HostsFileCheck \
--skip-validation-check=InstallerMasterSlaveCheck \
--skip-validation-check=ReplicationServicePipelines \
--start-and-report=true

For a description of the options, visit this page and click on the second magnifying glass to get the description.

As before, we want everything to be running and for the replicator to be online, run:

shell> trepctl status

This should tell you everything is running – if you get an error about this not being found, source the environment to populate your PATH correctly:

shell> source /opt/continuent/share/env.sh

We want everything to be online and running. If it isn’t, use the docs to help determine the reason, or use our discussion group to ask questions.

Step 5: Generating DDL

For your chosen MySQL database schema, you need to generate the staging and live table definitions for Hive.

A tool, ddlscan, is provided for this. You need to run it and provide the JDBC connect string for your database, and your user and password. If you followed the prereqs, use the one for the tungsten user.

First create the live table DDL:

shell> ddlscan -user tungsten -url 'jdbc:mysql://host1:3306/test' -pass password -template ddl-mysql-hive-0.10.vm -db test > schema.sql

Now apply it to Hive:

shell> cat schema.sql | hive

To create Hive tables that read the staging files loaded by the replicator, use the ddl-mysql-hive-0.10-staging.vm template on the same database:

shell> ddlscan -user tungsten -url 'jdbc:mysql://host:3306/test' -pass password -template ddl-mysql-hive-0.10-staging.vm -db test > schema-staging.sql

Now apply it to Hive again:

shell> cat schema-staging.sql | hive

Step 6: Start Writing Data

Hopefully by this point you’ve got two VMs, one running MySQL and the master replicator extracting info from the MySQL binary log. On the other, you have a basic Cloudera instance with a slave replicator writing changes. Both replicator should be online (use ‘trepctl status’ to check).

All you need to do is start writing data into the tables you selected when creating the DDL. That should be it – you should see data start to stream into Hadoop.

Real-Time Replication from MySQL to Cassandra

Earlier this month I blogged about our new Hadoop applier, I published the docs for that this week (http://docs.continuent.com/tungsten-replicator-3.0/deployment-hadoop.html) as part of the Tungsten Replicator 3.0 documentation (http://docs.continuent.com/tungsten-replicator-3.0/index.html). It contains some additional interesting nuggets that will appear in future blog posts.

The main part of that functionality that performs the actual applier for Hadoop is based around a JavaScript applier engine – there will eventually be docs for that as part of the Batch Applier content (http://docs.continuent.com/tungsten-replicator-3.0/deployment-batchloading.html). The core of this system is that it    takes the information from the data stream of the THL and the CSV file that was written by the batch applier system, and runs the commands necessary to load it into Hadoop and perform any necessary merges.

I wanted to see how easy it would be to use the same system to use that same flexible system and bend it to another database system, in my case, I chose Cassandra.

For the record, it took me a couple of hours to have this working, and I’m guessing another hour will file down some of the rough edges.

Cassandra is interesting as a database because it mixes a big distributed key/value store with a close-enough to SQL like interface in the form of CQL. And that means we can make use of the CQL to help us perform the merging into the final tables in a manner not dissimilar to the method we use for loading into Vertica.

Back to the Javascript batch loader, the applier provides five different implementable functions (all are technically optional) that you can use at different stages of the applier process. These are:

  • prepare() – called once when the applier goes online and can be used to create temporary directories or spaces
  • begin() – called at the start of each transaction
  • apply() – called at the end of the transaction once the data file has been written, but before the commit
  • commit() – called after each transaction commit has taken place; this where we can consolidate info.
  • release() – called when the applier goes offline

We can actually align these functions with a typical transaction – prepare() happens before the statements even start, begin() is the same as BEGIN, apply() happens immediately before COMMIT and commit() happens just after. release() can be used to do any clean up afterwards.

So let’s put this into practice and use it for Cassandra.

The basic process for loading is as follows:

  1. Write a CSV file to load into Cassandra
  2. Load the CSV file into a staging table within Cassandra; this is easy through CQL using the ‘COPY tablename FROM filename’ CQL statement.
  3. Merge the staging table data with a live table to create a carbon copy of our MySQL table content.

For the loading portion, what we’ll do is load the CSV into a staging table, and then we’ll merge the staging table and live table data together during the commit stage of our batch applier. We’ll return to this in more detail.

For the merging, we’ll take the information from the staging table, which includes the sequence number and operation type, and then write the ‘latest’ version of that row and put it into the live table. That gives us a structure like this:

Cassandra Loader

Tungsten Replicator is going to manage this entire process for us – all we need to do ins install the replicators, plug in these custom bits, and let it run.

As with the Hadoop applier, what we’re going to do is use the batch applier to generate only insert and delete rows; UPDATE statements will be converted into a delete of the original version and insert of the new version. So:

INSERT INTO sample VALUES (1,’Message’)

Is an insert…

DELETE sample WHERE id  = 1

Is a delete, and:

UPDATE sample SET message = ’Now you see me’ WHERE id = 1

is actually:

DELETE sample WHERE id  = 1
 INSERT INTO sample VALUES (1,’Now you see me’)

This gets round the problem of doing updates (which in big data stores are expensive, particularly Hadoop which doesn’t support updating existing data), into a more efficient delete and insert.

In the CSV data itself, this is represented by prefix every row with three fields:

optype, sequence number, unique id

Optype is ‘D’ for a delete and ‘I’ for an insert and is used to identify what needs to be done. The sequence number is the unique transaction ID from the replicator THL. This number increases by one for every transaction, and this means we can always identify the ‘latest’ version of a row, which is important to us when processing the transaction into Cassandra. the unique ID is the primary key (or compound key) from the source data. We need this to ensure we update the right row. To replicate data in this way, we must have a primary key on the data. If you don’t have primary keys, you are probably in a world of hurt anyway, so it shouldn’t be a stretch.

One difficulty here is that we need to cope with an idiosyncracy of Cassandra, which is that by default, Cassandra orders fields in the ‘tables’ (really collections of key/values) so that integers and numbers appear first in the table, and text appears last. This is an optimisation that Cassandra makes that complicates things for us, but only in a very small way. For the moment, we’ll handle it by assuming that we are loading only one table with a known format into Cassandra. We could handle multiple tables by using a simple IF statement in the JS and using different formats for that, or we could actually extract the info from the incoming data; I’m going to skip that because it keeps us away from the cool element of actually getting the data in.

Within Cassandra then we have two tables, the table we are loading data into, and the staging table that we load the CSV data into. For our sample, the live schema is ‘sample’, the live table is ‘sample’ and the staging table is ‘staging_sample’.

The definitions for these in Cassandra are for the sample live table:

 CREATE TABLE sample (
 id int,
 message text,
 PRIMARY KEY (id)
 ) WITH
 bloom_filter_fp_chance=0.010000 AND
 caching='KEYS_ONLY' AND
 comment='' AND
 dclocal_read_repair_chance=0.000000 AND
 gc_grace_seconds=864000 AND
 index_interval=128 AND
 read_repair_chance=0.100000 AND
 replicate_on_write='true' AND
 populate_io_cache_on_flush='false' AND
 default_time_to_live=0 AND
 speculative_retry='99.0PERCENTILE' AND
 memtable_flush_period_in_ms=0 AND
 compaction={'class': 'SizeTieredCompactionStrategy'} AND
 compression={'sstable_compression': 'LZ4Compressor'};

And for the staging_sample table:

CREATE TABLE staging_sample (
 optype text,
 seqno int,
 fragno int,
 id int,
 message text,
 PRIMARY KEY (optype, seqno, fragno, id)
 ) WITH
 bloom_filter_fp_chance=0.010000 AND
 caching='KEYS_ONLY' AND
 comment='' AND
 dclocal_read_repair_chance=0.000000 AND
 gc_grace_seconds=864000 AND
 index_interval=128 AND
 read_repair_chance=0.100000 AND
 replicate_on_write='true' AND
 populate_io_cache_on_flush='false' AND
 default_time_to_live=0 AND
 speculative_retry='99.0PERCENTILE' AND
 memtable_flush_period_in_ms=0 AND
 compaction={'class': 'SizeTieredCompactionStrategy'} AND
 compression={'sstable_compression': 'LZ4Compressor'};

I’ve put both tables into a ‘sample’ collection.

Remember that that idiosyncrasy I mentioned? Here it is, a bare table loading from CSV will actually order the data as:

seqno,uniqno,id,optype,message

This is Cassandra’s way of optimising integers over text to speed up lookups, but for us is a minor niggle. Right now, I’m going to handle it by assuming we are replicating only one schema/table and we we not what the structure of that looks like. Longer term, I want to pull it out of the metadata, but that’s a refinement.

So let’s start by having a look at the basic JS loader script, it’s really the component that is going to handle the core element of the work, managing the CSV files that come in from the batch engine and applying them into Cassandra. Remember, there are five functions that we can define, but for the purposes of this demonstration we’re going to use only two of them, apply(), which will load the CSV file into Cassandra, and the commit() function, which will perform the steps to merge the stage data.

The apply() function does two things, it identifies the table and schema, and then runs the command to load this data into Cassandra through the cqlsh command-line tool. We actually can’t run CQL directly from this command line, but I wrote a quick shell script that pipes CQL from the command-line into a running cqlsh.

The commit() function on the other hand is simpler, although it does a much more complicated job using another external script, this time written in Ruby.

So this gives us a cassandra.js script for the batch applier that looks like this:

function apply(csvinfo)
{
   sqlParams = csvinfo.getSqlParameters();
   csv_file = sqlParams.get("%%CSV_FILE%%");
   schema = csvinfo.schema;
   table = csvinfo.table;
  runtime.exec("/opt/continuent/share/applycqlsh.sh " + schema + ' "copy staging_' + table + " (optype,seqno,uniqno,id,message) from '" + csv_file + "';\"");
}

function commit()
{
  runtime.exec("/opt/continuent/share/merge.rb " + schema);
}

So, the apply() function is called for each event as written into the THL from the MySQL binary log, and the content of the CSV file generated at that point contains the contents of the THL event; if it’s one row, it’s a one-row CSV file; if it’s a statement or transaction that created 2000 rows, it’s a 2000 row CSV file.

The csvinfo object that is provided contains information about the batch file that is written, including, as you can see here, the schema and table names, and the sequence number. Note that we could, at this point, pull out table info, but we’re going to concentrate on pulling a single table here just for demo purposes.

The CQL for loading the CSV data is:

COPY staging_tablename (optype,seqno,uniqno,id,message) from ‘FILENAME’;

This says, copy the the specific columns in this order from the file into the specified table.  As I mentioned, currently this is hard coded into the applier JS, but would be easy to handle for more complex schemas and structures.

The commit() function is even simpler, because it just calls a script that will do the merging for us – we’ll get to that in a minute.

So here’s the script that applies an arbitrary CQL statement into Cassandra:

 #!/bin/bash
SCHEMA=$1;shift
echo "$*" |cqlsh -k $SCHEMA tr-cassandra2

Really simple, but gets round a simple issue.

The script that does the merge work is more complex; in other environments we might be able to do this all within SQL, but CQL is fairly limited with no sub-queries. So we do it long-hand using Ruby. The basic sequence is quite simple, and is in two phases:

  1. Delete every row mentioned in the staging table with an optype of D with a matching unique key
  2. Insert the *last* version of an insert for each unique ID – the last version will be the latest one in the output. We can pick this out by just iterating over every insert and picking the one with the highest Sequence number as generated by the THL transaction ID.
  3. Delete the content from the staging table because we’ve finished with it. That empties the staging table ready for the next set of transactions.

That file looks like this:

#!/usr/bin/ruby

require 'cql'

client = Cql::Client.connect(hosts: ['192.168.1.51'])
client.use('sample')

rows = client.execute("SELECT id FROM staging_sample where optype = 'D'")

deleteids = Array.new()

rows.each do |row|
puts "Found ID #{row['id']} has to be deleted"
deleteids.push(row['id'])
end

deleteidlist = deleteids.join(",")

client.execute("delete from sample where id in (#{deleteidlist})");
puts("delete from sample where id in (#{deleteidlist})");
rows = client.execute("SELECT * FROM staging_sample where optype = 'I'");

updateids = Hash.new()
updatedata = Hash.new()

rows.each do |row|
id = row['id']
puts "Found ID #{id} seq #{row['seqno']} has to be inserted"
if updateids[id]
if updateids[id] < row['seqno']
updateids[id] = row['seqno']
row.delete('seqno')
row.delete('fragno')
row.delete('optype')
updatedata[id] = row
end
else
updateids[id] = row['seqno']
row.delete('seqno')
row.delete('fragno')
row.delete('optype')
updatedata[id] = row
end
end

updatedata.each do |rowid,rowdata|
puts "Should update #{rowdata['id']} with #{rowdata['message']}"
collist = rowdata.keys.join(',')
colcount = rowdata.keys.length
substbase = Array.new()
#  (1..colcount).each {substbase.push('?')}
rowdata.values.each do |value|
if value.is_a?(String)
substbase.push("'" + value.to_s + "'")
else
substbase.push(value)
end
end

substlist = substbase.join(',')

puts('Column list: ',collist)
puts('Subst list: ',substlist)
cqlinsert = "insert into sample ("+collist+") values ("+substlist+")"
puts("Statement: " + cqlinsert)
client.execute(cqlinsert)
end

client.execute("delete from staging_sample where optype in ('D','I')")

Again, currently, this is hard coded, but I could easily of got the schema/table name from the JS batch applier – the actual code is table agnostic and will work with any table.

So, I’ve setup two replicators – one uses the cassandra.js rather than hadoop.js but works the same way, and copied the applycqlsh.sh and merge.rb into /opt/continuent/share.

And we’re ready to run. Let’s try it:

mysql> insert into sample values (0,'First Message’);
Query OK, 1 row affected (0.01 sec)

We’ve inserted one row. Let’s go check Cassandra:

cqlsh:sample> select * from sample;

id  | message
-----+---------------
489 | First Message

Woohoo – data from MySQL straight into Cassandra.

Now let’s try updating it:

mysql> update sample set message = 'Updated Message' where id = 489;
Query OK, 1 row affected (0.01 sec)
Rows matched: 1  Changed: 1  Warnings: 0

And in Cassandra:

cqlsh:sample> select * from sample;

id  | message
-----+-----------------
489 | Updated Message

Bigger woohoo. Not only am I loading data directly into Cassandra, but I can update it as well. Now I can have a stream of update and information within MySQL replicated over to Cassandra for whatever analysis or information that I need without any issues.

Cool huh? I certainly think so (OK, but I’m biased).

Now I haven’t tested it, but this should just as easily work from Oracle; I’ll be testing that and let you know.

Any other database destinations people would like to see for replicating into? If so, let me know and I’ll see what I can do.

Anonymizing Data During Replication

If you happen to work with personal data, chances are you are subject to SOX (Sarbanes-Oxley) whether you like it or not.

One of the worst aspects of this is that if you want to be able to analyse your data and you replicate out to another host, you have to find a way of anonymizing the information. There are of course lots of ways of doing this, but if you are replicating the data, why not anonymize it during the replication?

Of the many cool features in Tungsten Replicator, one of my favorites is filtering. This allows you to process the stream of changes that are coming from the data extracted from the master and perform operations on it. We use it a lot in the replicator for ignoring tables, schemas and columns, and for ensuring that we have the correct information within the THL.

Given this, let’s use it to anonymize the data as it is being replicated so that we don’t need to post-process it for analysis, and we’re going to use JavaScript to do that.

For the actual anonymization, we’re going to use a simple function that devolves the content into an anonymizer. For this, I’m going to use the md5 in JavaScript function provided by Paul Johnston here: http://pajhome.org.uk/crypt/md5, although others are available. The main benefit of using md5 is that the same string of text will always be hashed into a consistent value. This is important because it means that we can still run queries with joins between the data, knowing that, for example, the postcode ‘XQ23 1LD’ will be based into ‘d41d8cd98f00b204e9800998ecf8427e’ in every table. Joins still work, and data analysis is entirely valid.

Better still, because it’s happening during replication I always have a machine with anonymised information that I can use to query my data without worrying about SOX tripping me up.

Within the JS filter environment there are two key functions we need to use. One is prepare(), which is called when the replicator goes online, and the other is filter() which processes each event within the THL.

In the prepare() function, I’m going to identify from the configuration file which fields from the configuration file we are going to perform the actual hashing operation on. We do that by creating a hash structure within JavaScript that maps the schema, table name, and field. For example, to anonymise the field ‘postcode’ in ‘address’ in the schema ‘customers’:

stcspec=customers.address.postcode

For this to work, we must have the colnames filter enabled.

The function itself just splits the stcspec parameter from the configuration file into a hash of that combo:

var stcmatch = {};
function prepare()
{
  logger.info("anonymizer: Initializing...");  

    stcspec = filterProperties.getString("stcspec");
    stcarray = stcspec.split(",");
    for(i=0;i<stcarray.length;i++) { 
        stcmatch[stcarray[i]] = 1;
    }

}

The filter() function is provided one value, the event object from the THL. We operate only on ROW-based data (to save us parsing the SQL statement), and then supply the event to an anonymize() function for the actual processing:

function filter(event)
{
  data = event.getData();
  if(data != null)
  {
    for (i = 0; i < data.size(); i++)
    {
      d = data.get(i);

      if (d != null && d instanceof com.continuent.tungsten.replicator.dbms.StatementData)
      {
          // Ignore statements
      }
      else if (d != null && d instanceof com.continuent.tungsten.replicator.dbms.RowChangeData)
      {
          anonymize(event, d);
      }
    }
  }
}

Within the anonymise event, we extract the schema and table name, and then look at each column, and if it exists in our earlier stcspec hash, we change the content of the THL on the way past to be the hashed value, in place of the original field value. To do this we iterate over the rowChanges, then over the columns, then over the individual rows:

function anonymize(event, d)
{
  rowChanges = d.getRowChanges();

  for(j = 0; j < rowChanges.size(); j++)
  {
    oneRowChange = rowChanges.get(j);
    var schema = oneRowChange.getSchemaName();
    var table = oneRowChange.getTableName();
    var columns = oneRowChange.getColumnSpec();

    columnValues = oneRowChange.getColumnValues();
    for (c = 0; c < columns.size(); c++)
    {
      columnSpec = columns.get(c);
          columnname = columnSpec.getName();

      rowchangestc = schema + '.' + table + '.' + columnname;

      if (rowchangestc in stcmatch) {

        for (row = 0; row < columnValues.size(); row++)
        {
            values = columnValues.get(row);
            value = values.get(c);
            value.setValue(hex_md5(value.getValue()));

        }
      }
    }
  }
}

Append the md5() script from Paul Johnston (or indeed whichever md5 / hashing algorithm you want to use) to the end of the entire script text:

var stcmatch = {};
function prepare()
{
  logger.info("anonymizer: Initializing...");  

    stcspec = filterProperties.getString("stcspec");
    stcarray = stcspec.split(",");
    for(i=0;i<stcarray.length;i++) { 
        stcmatch[stcarray[i]] = 1;
    }

}

function filter(event)
{
  data = event.getData();
  if(data != null)
  {
    for (i = 0; i < data.size(); i++)
    {
      d = data.get(i);

      if (d != null && d instanceof com.continuent.tungsten.replicator.dbms.StatementData)
      {
          // Ignore statements
      }
      else if (d != null && d instanceof com.continuent.tungsten.replicator.dbms.RowChangeData)
      {
          anonymize(event, d);
      }
    }
  }
}

function anonymize(event, d)
{
  rowChanges = d.getRowChanges();

  for(j = 0; j < rowChanges.size(); j++)
  {
    oneRowChange = rowChanges.get(j);
    var schema = oneRowChange.getSchemaName();
    var table = oneRowChange.getTableName();
    var columns = oneRowChange.getColumnSpec();

    columnValues = oneRowChange.getColumnValues();
    for (c = 0; c < columns.size(); c++)
    {
      columnSpec = columns.get(c);
          columnname = columnSpec.getName();

      rowchangestc = schema + '.' + table + '.' + columnname;

      if (rowchangestc in stcmatch) {

        for (row = 0; row < columnValues.size(); row++)
        {
            values = columnValues.get(row);
            value = values.get(c);
            value.setValue(hex_md5(value.getValue()));

        }
      }
    }
  }
}

Hint

Depending on your configuration, datatypes and version, the return from getValue() is a byte array, not a character string; in that case, add this function:

function byteArrayToString(byteArray) 
{ 
   str = ""; 
   for (i = 0; i < byteArray.length; i++ ) 
   { 
      str += String.fromCharCode(byteArray[i]); 
   } 
   return str; 
}

And change:

value.setValue(hex_md5(value.getValue()));

to:

value.setValue(hex_md5(byteArrayToString(value.getValue())));

That will correctly convert it into a string.

If the error hits, Tungsten Replicator will just stop on that event, not apply bad data. Putting it ONLINE again after changing the script will re-read the event, re-process it through the filter, and then apply the data.

end Hint

Now we need to manually update the configuration. On a Tungsten Replicator slave, open the static-SERVICENAME.properties file in /opt/continuent/tungsten/tungsten-replicator/conf and then add the following lines within the filter specification area (about 90% of the way through):

replicator.filter.anonymize=com.continuent.tungsten.replicator.filter.JavaScriptFilter 
replicator.filter.anonymize.script=/opt/continuent/share/anonymizer.js 
replicator.filter.anonymize.stcspec=customers.address.postcode

The first line defines a filter called anonymize that uses the JavaScriptFilter engine, the second line specifies the location of the JavaScript file, and the third contains the specification of which fields we will change, separated by a comma.

Now, find the line containing “replicator.stage.q-to-dbms.filters” around about line 200 or so and add ‘anonymize’ to the end of the filter list.

Finally, make sure you copy the anonymizer.js script into the /opt/continuent/share directory (or wherever you want to put it that matches the paths specified above).

Now restart the replicator:

$ replicator restart

On your master, make sure you have the colnames filter-enabled. You can do this in master’s static-SERVICENAME.properties like this:

replicator.stage.binlog-to-q.filters=colnames,pkey

Now restart the master replicator:

$ replicator restart

Double check that the replicator is online using trepctl; it will fail if the config is wrong or the JavaScript isn’t found. If everything is running, go to your master and make sure you enable row-based logging (my.cnf: binlog-format=’ROW’), and then try inserting some data into the table that we are anonymizing:

mysql> insert into address values(0,'QX17 1LG');

Now check the value on the slave:

| 711 | dc889465b382 |

Woohoo!

Anonymized data now exists on the slave without having to manually run the process to clean the data.

If you want to extend the fields this is applied to, add them to the stcspec in the configuration, separating each one by a comma, and make sure you restart the replicator.