Upcoming Webinar, 19th July, What is New in Tungsten Replicator 5.2 and Tungsten Clustering 5.2?

Continuent Tungsten 5.2 is just around the corner. This is one of our most exciting Tungsten product releases for some time!

In this webinar we’re going to have a look at a host of new features in the new release, including
Three new Replication Applier Targets (Kafka, Cassandra and Elasticsearch)
New improvements to our core command-line tools trepctl and thl
New foundations for our filtering services, and
Improvements to the compatibility between replication and clustering

This webinar is going be a packed session and we’ll show all the exciting stuff with more in-depth follow-up sessions in the coming weeks.

 

You’ll also learn about some more exciting changes coming in the upcoming Tungsten releases (5.2.1 and 5.3), and our major Tungsten 6.0 release due out by the end of the year.

So come and join us to get the low down on everything related to Tungsten Replicator 5.2 and Tungsten Clustering 5.2. on Wednesday, July 19, 2017 9:00 AM – 9:30 AM PDT at https://attendee.gotowebinar. com/register/ 4108437731342545667

New Continuent Webinar Wednesdays and Training Tuesdays

We are just starting to get into the swing of setting up our new training and webinar schedule.
Initially, there will be one Webinar session (typically on a Wednesday) and one training session (on a Tuesday) every week from now. We’ll be covering a variety of different topics at each.
Typically our webinars will be about products and features, comparisons to other products, mixed in with product news (new releases, new features) and individual sessions based on what is going on at Continuent and the market in general.
Our training, by comparison, is going to be a hands-on, step-by-step sequence covering all of the different areas of our product. So we’ll cover everything from the basics of how the products work, how to deploy them, typical functionality (switching, start/stop, etc), and troubleshooting.
All of the sessions are going to be recorded and we’ll produce a suitable archive page so that you can go and view the past sessions. Need a refresher on re-provisioning a node in your cluster? There’s going to be a video for it and documentation to back it up.
Our first webinar is actually next Thursday (the Wednesday rule wouldn’t be a good one without an exception) and is all about MySQL Multi-Site/Multi-Master Done Right:
In this webinar, we discuss what makes Tungsten Clustering better than other alternatives (AWS RDS, Galera, MySQL InnoDB Cluster, and XtraDBCluster), especially for geographically distributed multi-site deployments, both for disaster recovery and multi-site/multi-master needs.
If you want to attend, please go ahead and register using this link: http://go.continuent.com/n0JI04Q03EAV0RD0i000Vo4
Keep your eyes peeled for the other upcoming sessions. More details soon.

Kafka Replication from MySQL and Oracle

Hello again everybody.

Well, I promised it a couple of weeks ago, and I’m sorry it has been so long (I’ve been working on other fun stuff in addition to this). But I’m pleased to say that we now have a fully working applier that takes data from an incoming THL stream, whether that is Oracle or MySQL, and converts that into a JSON document and message for distribution over a Kafka topic.

Currently, the configuration is organised with the following parameters:

  • The topic name is set according to the incoming schema and table. You can optionally add a prefix. So, for example, if you have a table ‘invoices’ in the schema ‘sales’, your Kafka topic will be sales_invoices, or if you’ve added a prefix, ‘myprefix_schema_table’.
  • Data is marshalled into a JSON document as part of the message, and the structure is to have a bunch of metadata and then an embedded record. You’ll see an example of this below. You can choose what metadata is included here. You can also choose to send everything on a single topic. I’m open to suggestions on whether it would be useful for this to be configured on a more granular level.
  • The msgkey is composed of the primary key information (if we can determine it), or the sequence number otherwise.
  • Messages are generated one row of source data at a time. There were lots of ways we could have done this, especially with larger single dumps/imports/multi-million-row transactions. There is no more sensible way. It may mean we get duplicate messages into Kafka, but these are potentially easier to handle than trying to send a massive 10GB Kafka message.
  • Since Zookeeper is a requirement for Kafka, we use Zookeeper to record the replicator status information.

Side note: One way I might consider mitigating that last item (and which may also apply to some of our other upcoming appliers, such as the ElasticSearch applier) is to actually change the incoming THL stream so that it is split into individual rows. This sounds entirely crazy, since it would separate the incoming THL sequence number from the source (MySQL binlog, Oracle, er, other upcoming extractors), but it would mean that we have THL on the applier side which is a single row of data. That means we would have a THL seqno per row of data, but would also mean that in the event of a problem, the replicator could restart from that one row of data, rather than restarting from the beginning of a multi-million-row transaction.

Anyway, what does it all look like in practice?

Well, here’s a simple MySQL instance and I’m going to insert a row into this table:

mysql> insert into sbtest.sbtest values (0,100,"Base Msg","Some other submsg");

OK, this looks like this:

mysql> select * from sbtest.sbtest where k = 100;
+--------+-----+----------+-------------------+
| id     | k   | c        | pad               |
+--------+-----+----------+-------------------+
| 255759 | 100 | Base Msg | Some other submsg |
+--------+-----+----------+-------------------+

Over in Kafka, let’s have a look what the message looks like. I’m just using the console consumer here:

{"_meta_optype":"INSERT","_meta_committime":"2017-05-27 14:27:18.0","record":{"pad":"Some other submsg","c":"Base Msg","id":"255759","k":"100"},"_meta_source_table":"sbtest","_meta_source_schema":"sbtest","_meta_seqno":"10130"}

And let’s reformat that into something more useful:

{
   "_meta_committime" : "2017-05-27 14:27:18.0",
   "_meta_source_schema" : "sbtest",
   "_meta_seqno" : "10130",
   "_meta_source_table" : "sbtest",
   "_meta_optype" : "INSERT",
   "record" : {
      "c" : "Base Msg",
      "k" : "100",
      "id" : "255759",
      "pad" : "Some other submsg"
   }
}

 

Woohoo! Kafka JSON message. We’ve got the metadata (and those field names/prefixes are likely to change), but we’ve also got the full record details. I’m still testing other data types and ensuring we get the data through correctly, but I don’t foresee any problems.

There are a couple of niggles still to be resolved:

  • The Zookeeper interface which is used to store state data needs addressing; although it works fine there are some occasional issues with key/path collisions.
  • Zookeeper and Kafka connection are not fully checked, so it’s possible to appear to be up and running when no connection is available.
  • Some further tweaking of the configuration would be helpful – for example, setting or implying specific formats for msg key and the embedded data.

I may add further configurations for other items, especially since longer term we might have a Kafka extractor and maybe we want to use that to distribute records, in which case we might want to track other information like the additional metadata and configuration (SQL mode etc) currently held within the THL. I’ll keep thinking about that though.

Anything else people would like to see here? Please email me at mc.brown@continuent.com and we’ll sort something out.

 

Replicating into ElasticSearch

So here at Continuent we are working on multiple new targets for applying data using Tungsten Replicator. There are so many potential targets out there where people want to replicate data directly into a specific system, sometimes just for a specific data set, table, database or requirements.

Yesterday afternoon, I started working on ElasticSearch – this morning I have it finished!

As with all solutions, the same basic principles apply – want to pull out of MySQL or Oracle and into something else? That’s fine. Want to replicate to HDFS and ElasticSearch? We do that too!

So what does it look like?

Installation operates just our normal appliers – you just specify the datasource type (ElasticSearch) and the EL host name and port:

tools/tpm configure alpha \
--datasource-type=elasticsearch \
--install-directory=/opt/continuent \
--master=ubuntuheterosrc \
--members=elasticsearch \
--replication-host=localhost \
--replication-password=root \
--replication-port=9200 \
--replication-user=root

There are some configurable options, but I’ll get to those later. For right now, let’s just see what happens when you insert some data. Here’s a simple table in MySQL:

mysql> describe mg;

+-------+----------+------+-----+---------+----------------+

| Field | Type     | Null | Key | Default | Extra          |

+-------+----------+------+-----+---------+----------------+

| id    | int(11)  | NO   | PRI | NULL    | auto_increment |

| msg   | char(80) | YES  |     | NULL    |                |

+-------+----------+------+-----+---------+----------------+

2 rows in set (0.00 sec)

And Let’s insert some data:

mysql> insert into mg values (99999,"Hello ElasticSearch");
Query OK, 1 row affected (0.10 sec)

Now let’s have a look what happens to that when it gets into ElasticSearch:

{
 "_id" : "99999",
 "_type" : "mg",
 "found" : true,
 "_version" : 2,
 "_index" : "msg",
 "_source" : {
 "msg" : "Hello ElasticSearch",
 "id" : "99999"
 }
}

Yay! A nice clean record into ElasticSearch so that we could be searching for the data it contains.

Incidentally, the information was written in using a Document ID made of up of the primary (more on that in a minute), and written into an index and type based on the schema and table.

Obviously we’re writing in a full record here – but keep in mind that this is the replicator and we could have filtered out columns or even tables from the information generated content. We’re trying to keep with the operational perspective of writing everything over to the target that we’ve been asked to.

Also be aware that we do this on a per-row basis. That is, every single row updated/inserted is written as a single entry into the ElasticSearch index.

That said, there are quite a few things that we can control:

  • By default, we treat the incoming schema name as the ElasticSearch ‘index’ and the incoming table name as the ElasticSearch ‘type’. So for example, with the schema ‘blog’ and the table ‘posts’ you are are going to get data written into /blog/posts/ID.You can change this behaviour by setting an explicit index and/or type name – this obviously writes everything into the target with those specific values, regardless of the incoming schema or table name, but maybe you just want one big index of all the data.So, by setting an explicit index of ‘allmybigdata’ and a type ‘rawtext’, everything gets written to /allmybigdata/rawtext/ID.
  • The difficulty with the above approach is it limits your ability to search based on some other values. Maybe the incoming data is from multiple blogs, but you want to be able to perform searches, there’s also an option to embed the schemaname and tablename into the data too:
    {
     "_source" : {
     "id" : "9",
     "source_table" : "mgg",
     "msg" : "Barneyrubble",
     "source_schema" : "msg",
     "idb" : "5",
     "committime" : "2017-05-11 11:30:04.0"
     },
     "_id" : "95",
     "_version" : 1,
     "found" : true,
     "_index" : "msg",
     "_type" : "mgg"
    }
  • You can also see in the above that embed a ‘committime’ if asked too, in case you want to search on that too.
  • Incidentally, one other thing about the above record, it’s actually a compound index from the MySQL side –  you can see that there are to ID fields, ‘id’ and ‘idb’ and the ElasticSearch _id is ’95’
  • The format of the document id is configurable, so you use:
    • The primary key (including compound ones), with everything combined into a single string. I.e key (9,5) becomes 95.
    • The primary key using underscores, I.e. (9,5) becomes 9_5;
    • The schema, table and primary key, I.e. (9,5) in msg.mg becomes msgmg95
    • The schema, table and primary key with underscores, I.e. (9,5) in msg.mg becomes msg_mg_9_5
  • Updates work exactly as you expect – they update the record directly, as we do a *proper* update, so the _version is updated appropriately
  • Deletes work as expected too
  • Document IDs can be configured so that an ElasticSearch auto generated value is used in place of an incoming primary key. However, be aware that if you use this, then we are unable to do deletes or updates, because we cannot track the generated ID and looks up would be expensive.
  • Fortunately, you can ignore errors when performing a delete or update to avoid the problem.

These are all configured through the usual properties, and the defaults look like this:

replicator.applier.dbms.ignoreDeleteErrors=false 
replicator.applier.dbms.ignoreUpdateErrors=false 
replicator.applier.dbms.docIdFormat=pkey 
replicator.applier.dbms.selfGeneratedId=false 
replicator.applier.dbms.useSchemaAsIndex=true 
replicator.applier.dbms.indexName= 
replicator.applier.dbms.useTableAsType=true 
replicator.applier.dbms.typeName= 
replicator.applier.dbms.embedSchemaTable=true 
replicator.applier.dbms.embedCommitTime=true

Currently, all of these are global settings  – I’m toying with the idea of using these as defaults, and then having a separate JSON configuration file that would be able to set these values on a per schema/table basis. I’d be interested to hear if anybody would find this useful. While I like this approach, it would add some processing overhead we might want to avoid. In reality, the better way to do this would be to configure separate services in the replicator to handle that process.

Some things that I am still checking and investigating:

  • Performance – Currently I’m seeing about 125 rows per second into ElasticSearch. This is in a VM with just 2 CPUs and 2GB RAM. I suspect we could increase this.
    I also have not in anyway done a more random workload, like Sysbench, or checked the compatibility with our own multi-threaded/parallel apply.
  • Latency – Latency right now is down in the µs, about where you’d expect. Obviously, it depends on the incoming data, but worth looking at.
  • Start/Stop/Restart – This first version contains *complete* restart ability as you would expect with the replicator. However, I haven’t added support to some of our other tools, like dsctl. I’ll address that in a future release.
  • Datatype Support – I’ve done only a few tables, and nothing substantial like textual or logging data.
  • Currently, we send individual rows as individual REST requests; I dont use the open channel and regular submissions (which might improve performance), or any kind of batching. These are only going to improve large data loads and dumps, rather than more traditional streaming replication

So there’s still some work to do, but the basic process is currently perfectly serviceable.

More important as far as I’m concerned, is that with this basic applier done and ready to be released to the public in our upcoming Tungsten Replicator 5.2.0, which is due at the end of June. That gives us about a month to complete testing and address some of the above issues.

If you would like to test out the new applier for ElasticSearch, please email me (mc.brown@continuent.com). I’m interested to get as much input and testing as possible.

 

 

 

 

Extending the Tungsten Replicator Core JS Filter Functionality

Tungsten Replicator has a really cool feature in that we can filter data as it goes past on the wire.

The replicator itself is written entirely in Java and writing filters for it is not as straightforward as it looks, which is why the much better feature is just to use the JavaScript mechanism and write filters using that tool instead. I’ll save the details for how you can write filters to process and massage data for another time, but right now I wanted to find a good way of improving that JavaScript environment.

There are filters, for example, where I want to be able to load JSON option and configuration files, or write out JSON versions of information, and plenty more.

Mozilla’s Rhino JS environment is what is used to provide the internal JS environment for running filters. The way this is supported is that rather than creating a Rhino JS environment that can do whatever it wants, instead, we create a JS instance specifically for executing the required functions within the filter. One of these instances is created for each filter that is configured in the system (and each batch instance too).

The reason we do this is because for each filter, we want each transaction event that appears in the THL log to get executed through the JS instance where the filter() function in the JS filter is executed with a single argument, the event data.

The limitation of this model is that we dont get the full Rhino environment because we execute the JS function directly, so certain top level items and functions like load() or require(), or utilities like JSON.stringify() are not available. We could do that by changing the way we do the configuration, but that could start to get messy quickly, while also complicating the security aspects of how we execute these components.

There are some messy ways in which we could get round this, but in the end, because I also wanted to add some general functionality into the filters system shared across all JS instances, I chose instead to just load a set of utility functions, written in JavaScript, into the JS instance for the filter. The wonderful thing about JS is that we can write all of the functions in JS, even for classes, methods and functions that aren’t provided elsewhere.

So I chose the path of least resistance, which means loading and executing a core JS file before loading and executing the main filter JS so that. We can place into that JS file all of the utility functions we want to be available to all of the filters.

So, to enable this the first thing we do is update the core Java code when we load the filter JS to load our core utility JS first. That occurs in replicator/src/java/com/continuent/tungsten/replicator/filter/JavaScriptFilter.java, within the prepare() function which is where we instantiate the JS environment based on the code.

String coreutilssrc = properties.getString(“replicator.filter.coreutils”);

// Import the standard JS utility script first
try
 {
 // Read and compile the core script functions
 BufferedReader inbase = new BufferedReader(new FileReader(coreutilssrc));
 script = jsContext.compileReader(inbase, scriptFile, 0, null);
 inbase.close();

 script.exec(jsContext, scope);
 }
catch (IOException e)
 {
 throw new ReplicatorException("Core utility library file not found: "
 + coreutilssrc, e);
 }
catch (EvaluatorException e)
 {
 throw new ReplicatorException(e);
 }

This is really straightforward, we obtain the path to the core utilities script from the configuration file (we’ll look at how we define that later), and then compile that within the jsContext object, where our JavaScript is being executed. We add some sensible error checking, but otherwise this is simple.

It’s important to note that this is designed to load that core file *before* the main filter file just in case we want to use anything in there.

Next, that configuration line, we can add into a default config by creating a suitable ‘template’ file for tpm, which we do by creating the file replicator/samples/conf/filters/default/coreutils.tpl. I’ve put it into the filters section because it only applies to filter environments.

The content is simple, it’s the line with the location of our core utility script:

# Defines the core utility script location
replicator.filter.coreutils=${replicator.home.dir}/support/filters-javascript/coreutils.js

And lastly, we need the script itself, replicator/support/filters-javascript/coreutils.js :

// Core utility JavaScript and functions for use in filters
//
// Author: MC Brown (9af05337@opayq.com)


// Simulate the load() function to additional external JS scripts

function load(filename) {
    var file = new java.io.BufferedReader(new java.io.FileReader(new java.io.File(filename)));

    var sb = "";
    while((line = file.readLine()) != null)
        {
            sb = sb + line + java.lang.System.getProperty("line.separator");
        }

    eval(sb);
}

// Read a file and evaluate it as JSON, returning the evaluated portion

function readJSONFile(path)
{
    var file = new java.io.BufferedReader(new java.io.FileReader(new java.io.File(path)));

    var sb = "";
    while((line = file.readLine()) != null)
        {
            sb = sb + line + java.lang.System.getProperty("line.separator");
        }

    jsonval = eval("(" + sb + ")");

    return jsonval;
}

// Class for reoncstituing objects into JSON

JSON = {
    parse: function(sJSON) { return eval('(' + sJSON + ')'); },
    stringify: (function () {
      var toString = Object.prototype.toString;
      var isArray = Array.isArray || function (a) { return toString.call(a) === '[object Array]'; };
      var escMap = {'"': '\\"', '\\': '\\\\', '\b': '\\b', '\f': '\\f', '\n': '\\n', '\r': '\\r', '\t': '\\t'};
      return function stringify(value) {
        if (value == null) {
          return 'null';
        } else if (typeof value === 'number') {
          return isFinite(value) ? value.toString() : 'null';
        } else if (typeof value === 'boolean') {
          return value.toString();
        } else if (typeof value === 'object') {
          if (typeof value.toJSON === 'function') {
            return stringify(value.toJSON());
          } else if (isArray(value)) {
            var res = '[';
            for (var i = 0; i < value.length; i++)
              res += (i ? ', ' : '') + stringify(value[i]);
            return res + ']';
          } else if (toString.call(value) === '[object Object]') {
            var tmp = [];
            for (var k in value) {
              if (value.hasOwnProperty(k))
                tmp.push(stringify(k) + ': ' + stringify(value[k]));
            }
            return '{' + tmp.join(', ') + '}';
          }
        }
        return '"' + value.toString() + '"';
      };
    })()
  };

For the purposes of validating my process, there are three functions:

  • load() – which loads an external JS file and executes it, so that we can load other JS scripts and libraries.
  • readJSONFile() – which loads a JSON file and returns it as a JSON object.
  • JSON class – which does two things, one is provides  JSON.parse() method for parsing strings as JSON objects into JS objects and the other is JSON.stringify() which will turn a JS object back into JSON

Putting all of this together gives you a replicator where we now have some useful functions to make writing JavaScript filters easier. I’ve pushed all of this up into my fork of the Tungsten Replicator code here: https://github.com/mcmcslp/tungsten-replicator/tree/jsfilter-enhance

Now, one final note. Because of the way load() works, in terms of running an eval() on the code to import it, it does mean that there is one final step to make functions useful. To explain what I mean, let’s say you’ve written a new JS filter using the above version of the replicator.

In your filter you include the line:

load("/opt/continuent/share/myreallyusefulfunctions.js");

Within that file, you define a function called runme():

function runme()
{
     logger.info("I'm a bit of text");
}

Now within myreallyusefulfunctions.js I can call that function fine:

runme();

But from within the JS filter, runme() will raise an unknown function error. The reason is that we eval()‘d the source file within the load() function, and so it’s context is wrong.

We can fix that within myreallyusefulfunctions.js by exporting the name explicitly:

if (runme.name) this[runme.name] = runme;

This points the parent namespace to the runme() in this context, and we put that at the end of myreallyusefulfunctions.js script and everything is fine.

I’m lazy, and I haven’t written a convenient function for it, but I will in a future blog.

Now we’ve got this far, let’s start building some useful JS functions and functionality to make it all work nicely…

Tungsten Replicator 3.0 is Cloudera Enterprise 5 Certified

One of the key platforms I’ve been testing on for the MySQL to Hadoop replication has been Cloudera, largely driven by customer requirements, but it’s also one of the easiest way to get started with Hadoop.

logo_cloudera_certified

What I’m even more pleased about is the fact that we are proud to announce that Tungsten Replicator 3.0 is certified for use on the new Cloudera Enterprise 5 platform. That means that we’re sure that replicating your data from MySQL to Cloudera 5 and have it work without causing problems or difficulties on the Hadoop loading and materialisation.

Cloudera is a great product, and we’re very happy to be working so effectively with the new Cloudera Enterprise 5. Cloudera certainly makes the core operation of managing and monitoring your Hadoop cluster so much easier, while still providing core functionality from the Hadoop family like Hive, HBase and Impala.

What I’m really interested in is the support for Spark, which will allow much easier live-querying and access to data.  That should make some data processing and live data views much easier to build and query further down the line.

Continuent Replication to Hadoop – Now in Stereo!

Hopefully by now you have already seen that we are working on Hadoop replication. I’m happy to say that it is going really well. I’ve managed to push a few terabytes of data and different data sets through into Hadoop on Cloudera, HortonWorks, and Amazon’s Elastic MapReduce (EMR). For those who have been following my long association with the IBM InfoSphere BigInsights Hadoop product, and I’m pleased to say that it’s working there too. I’ve had to adapt Robert’s original script to work with the different versions of the underlying Hadoop tools and systems to make it compatible. The actual performance and process is unchanged; you just use a different JS-based batchloader script to work with different tools.

Robert has also been simplifying some of the core functionality, such as configuring some fixed pre-determined formats, so you no longer have to explicitly set the field and record separators.

I’ve also been testing the key feature of being able to integrate the provisiong of information using Sqoop and merging that original Sqooped data into Hadoop, and then following up with the change data that the replicator is effectively transferring over. The system works exactly as I’ve just described – start the replicator, Sqoop the data, materialise the view within Hadoop. It’s that easy; in fact, if you want a deeper demonstration of all of these features, we’ve got a video from my recent webinar session:

Real Time Data Loading from MySQL to Hadoop with New Tungsten Replicator 3.0

If you can’t spare the time, but still want to know about our Hadoop applier, try our short 5-minute video:

Real-time data loading into Hadoop with Tungsten Replicator

While you’re there, check out the Clustering video I did at the same time:

Continuent Tungsten Clustering

And of course, don’t forget that you can see the product and demos live by attending Percona Live in Santa Clara this week (1st-4th April).