Replicating into ElasticSearch

So here at Continuent we are working on multiple new targets for applying data using Tungsten Replicator. There are so many potential targets out there where people want to replicate data directly into a specific system, sometimes just for a specific data set, table, database or requirements.

Yesterday afternoon, I started working on ElasticSearch – this morning I have it finished!

As with all solutions, the same basic principles apply – want to pull out of MySQL or Oracle and into something else? That’s fine. Want to replicate to HDFS and ElasticSearch? We do that too!

So what does it look like?

Installation operates just our normal appliers – you just specify the datasource type (ElasticSearch) and the EL host name and port:

tools/tpm configure alpha \
--datasource-type=elasticsearch \
--install-directory=/opt/continuent \
--master=ubuntuheterosrc \
--members=elasticsearch \
--replication-host=localhost \
--replication-password=root \
--replication-port=9200 \
--replication-user=root

There are some configurable options, but I’ll get to those later. For right now, let’s just see what happens when you insert some data. Here’s a simple table in MySQL:

mysql> describe mg;

+-------+----------+------+-----+---------+----------------+

| Field | Type     | Null | Key | Default | Extra          |

+-------+----------+------+-----+---------+----------------+

| id    | int(11)  | NO   | PRI | NULL    | auto_increment |

| msg   | char(80) | YES  |     | NULL    |                |

+-------+----------+------+-----+---------+----------------+

2 rows in set (0.00 sec)

And Let’s insert some data:

mysql> insert into mg values (99999,"Hello ElasticSearch");
Query OK, 1 row affected (0.10 sec)

Now let’s have a look what happens to that when it gets into ElasticSearch:

{
 "_id" : "99999",
 "_type" : "mg",
 "found" : true,
 "_version" : 2,
 "_index" : "msg",
 "_source" : {
 "msg" : "Hello ElasticSearch",
 "id" : "99999"
 }
}

Yay! A nice clean record into ElasticSearch so that we could be searching for the data it contains.

Incidentally, the information was written in using a Document ID made of up of the primary (more on that in a minute), and written into an index and type based on the schema and table.

Obviously we’re writing in a full record here – but keep in mind that this is the replicator and we could have filtered out columns or even tables from the information generated content. We’re trying to keep with the operational perspective of writing everything over to the target that we’ve been asked to.

Also be aware that we do this on a per-row basis. That is, every single row updated/inserted is written as a single entry into the ElasticSearch index.

That said, there are quite a few things that we can control:

  • By default, we treat the incoming schema name as the ElasticSearch ‘index’ and the incoming table name as the ElasticSearch ‘type’. So for example, with the schema ‘blog’ and the table ‘posts’ you are are going to get data written into /blog/posts/ID.You can change this behaviour by setting an explicit index and/or type name – this obviously writes everything into the target with those specific values, regardless of the incoming schema or table name, but maybe you just want one big index of all the data.So, by setting an explicit index of ‘allmybigdata’ and a type ‘rawtext’, everything gets written to /allmybigdata/rawtext/ID.
  • The difficulty with the above approach is it limits your ability to search based on some other values. Maybe the incoming data is from multiple blogs, but you want to be able to perform searches, there’s also an option to embed the schemaname and tablename into the data too:
    {
     "_source" : {
     "id" : "9",
     "source_table" : "mgg",
     "msg" : "Barneyrubble",
     "source_schema" : "msg",
     "idb" : "5",
     "committime" : "2017-05-11 11:30:04.0"
     },
     "_id" : "95",
     "_version" : 1,
     "found" : true,
     "_index" : "msg",
     "_type" : "mgg"
    }
  • You can also see in the above that embed a ‘committime’ if asked too, in case you want to search on that too.
  • Incidentally, one other thing about the above record, it’s actually a compound index from the MySQL side –  you can see that there are to ID fields, ‘id’ and ‘idb’ and the ElasticSearch _id is ’95’
  • The format of the document id is configurable, so you use:
    • The primary key (including compound ones), with everything combined into a single string. I.e key (9,5) becomes 95.
    • The primary key using underscores, I.e. (9,5) becomes 9_5;
    • The schema, table and primary key, I.e. (9,5) in msg.mg becomes msgmg95
    • The schema, table and primary key with underscores, I.e. (9,5) in msg.mg becomes msg_mg_9_5
  • Updates work exactly as you expect – they update the record directly, as we do a *proper* update, so the _version is updated appropriately
  • Deletes work as expected too
  • Document IDs can be configured so that an ElasticSearch auto generated value is used in place of an incoming primary key. However, be aware that if you use this, then we are unable to do deletes or updates, because we cannot track the generated ID and looks up would be expensive.
  • Fortunately, you can ignore errors when performing a delete or update to avoid the problem.

These are all configured through the usual properties, and the defaults look like this:

replicator.applier.dbms.ignoreDeleteErrors=false 
replicator.applier.dbms.ignoreUpdateErrors=false 
replicator.applier.dbms.docIdFormat=pkey 
replicator.applier.dbms.selfGeneratedId=false 
replicator.applier.dbms.useSchemaAsIndex=true 
replicator.applier.dbms.indexName= 
replicator.applier.dbms.useTableAsType=true 
replicator.applier.dbms.typeName= 
replicator.applier.dbms.embedSchemaTable=true 
replicator.applier.dbms.embedCommitTime=true

Currently, all of these are global settings  – I’m toying with the idea of using these as defaults, and then having a separate JSON configuration file that would be able to set these values on a per schema/table basis. I’d be interested to hear if anybody would find this useful. While I like this approach, it would add some processing overhead we might want to avoid. In reality, the better way to do this would be to configure separate services in the replicator to handle that process.

Some things that I am still checking and investigating:

  • Performance – Currently I’m seeing about 125 rows per second into ElasticSearch. This is in a VM with just 2 CPUs and 2GB RAM. I suspect we could increase this.
    I also have not in anyway done a more random workload, like Sysbench, or checked the compatibility with our own multi-threaded/parallel apply.
  • Latency – Latency right now is down in the µs, about where you’d expect. Obviously, it depends on the incoming data, but worth looking at.
  • Start/Stop/Restart – This first version contains *complete* restart ability as you would expect with the replicator. However, I haven’t added support to some of our other tools, like dsctl. I’ll address that in a future release.
  • Datatype Support – I’ve done only a few tables, and nothing substantial like textual or logging data.
  • Currently, we send individual rows as individual REST requests; I dont use the open channel and regular submissions (which might improve performance), or any kind of batching. These are only going to improve large data loads and dumps, rather than more traditional streaming replication

So there’s still some work to do, but the basic process is currently perfectly serviceable.

More important as far as I’m concerned, is that with this basic applier done and ready to be released to the public in our upcoming Tungsten Replicator 5.2.0, which is due at the end of June. That gives us about a month to complete testing and address some of the above issues.

If you would like to test out the new applier for ElasticSearch, please email me (mc.brown@continuent.com). I’m interested to get as much input and testing as possible.

 

 

 

 

Percona Live 2017

So glad to have had a successful Percona Live last week. Continuent were Diamond Sponsors and now that we are back into a company and not part of VMware we have a little more freedom to get back into the MySQL community.

I had two primary sessions, both on the replicator/ But one was looking specifically at the replicator and how we get data into Big Data targets, the other on general problems of replicating between heterogeneous sources. After the first of those, David from Percona interviewed me to understand a bit more about what I was talking about

I was also on the keynote panel where we discussed a variety of different topics and you can see the full video of that through the link.

 

Hadoop BoF Session at OSCON

I have a BoF session next week at OSCON next week:

Migrating Data from MySQL and Oracle into Hadoop

The session is at 7pm Tuesday night – look for rooms D135 and/or D137/138.

Correction: We are now in  E144 on Tuesday with the Hadoop get together first at 7pm, and the Data Migration to follow at 8pm.

I’m actually going to be joined by Gwen Shapira from Cloudera, who has a BoF session on Hadoop next door at the same time, along with Eric Herman from Booking.com. We’ll use the opportunity to talk all things Hadoop, but particularly the ingestion of data from MySQL and other databases into the Hadoop datastore.

As always, it’d be great to meet anybody interested in Hadoop at the BoF, please come along and introduce yourselves, and hopefully I’ll see you next week!

Replicating Oracle Webinar Question Follow-up

We had really great webinar on Replicating to/from Oracle earliest this month, and you can view the recording of that Webinar here.

A good sign of how great a Webinar was is the questions that come afterwards, and we didn’t get through them all. so here are all the questions and answers for the entire webinar.

Q: What is the overhead of Replicator on source database with asynchronous CDC?

A: With asynchronous operation there is no substantial CPU overhead (as with synchronous), but the amount of generated redo logs becomes bigger requiring more disk space and better log management to ensure that the space is used effectively.

Q: Do you support migration from Solaris/Oracle to Linux/Oracle?

A: The replication is not certified for use on Solaris, however, it is possible to configure a replicator to operate remotely and extract from a remote Oracle instance. This is achieved by installing Tungsten Replicator on Linux and then extracting from the remote Oracle instance.

Q: Are there issues in supporting tables without Primary Keys on Oracle to Oracle replication?

A: Non-primary key tables will work, but it is not recommended for production as it implies significant overhead when applying to a target database.

Q: On Oracle->Oracle replication, if there are triggers on source tables, how is this handled?

A: Tungsten Replicator does not automatically disable triggers. The best solution is to remove triggers on slaves, or rewrite triggers to identify whether a trigger is being executed on the master or slave and skip it accordingly, although this requires rewriting the triggers in question.

Q: How is your offering different/better than Oracle Streams replication?

A: We like to think of ourselves as GoldenGate without the price tag. The main difference is the way we extract the information from Oracle, otherwise, the products offer similar functionality. For Tungsten Replicator in particular, one advantage is the open and flexible nature, since Tungsten Replicator is open source, released under a GPL V2 license, and available at https://code.google.com/p/tungsten-replicator/.

Q: How is the integrity of the replica maintained/verified?

A: Replicator has built-in real-time consistency checks: if an UPDATE or DELETE doesn’t update any rows, Replicator will go OFFLINE:ERROR, as this indicates an inconsistent dataset.

Q: Can configuration file based passwords be specified using some form of encrypted value for security purposes to keep them out of the clear?

A: We support an INI file format so that you do not have to use the command-line installation process. There is currently no supported option for an encrypted version of these values, but the INI file can be secured so it is only readable by the Tungsten user.

Q: Our source DB is Oracle RAC with ~10 instances. Is coherency maintained in the replication from activity in the various instances?

A: We do not monitor the information that has been replicated; but CDC replicates row-based data, not statements, so typical sequence insertion issues that might occur with statement based replication should not apply.

Q: Is there any maintenance of Oracle sequence values between Oracle and replicas?

A: Sequence values are recorded into the row data as extracted by Tungsten Replicator. Because the inserted values, not the sequence itself, is replicated, there is no need to maintain sequences between hosts.

Q: How timely is the replication? Particularly for hot source tables receiving millions of rows per day?

A: CDC is based on extracting the data at an interval, but the interval can be configured. In practice, assuming there are regular inserts and updates on the Oracle side, the data is replicated in real-time. See https://docs.continuent.com/tungsten-replicator-3.0/deployment-oracle-cdctuning.html for more information on how this figure can be tuned.

Q: Can parallel extractor instances be spread across servers rather than through threads on the same server (which would be constrained by network or HBA)?

A: Yes. We can install multiple replicators and tune the extraction of the parallel extractor accordingly. However, that selection would need to be manual, but certainly that is possible.

Q: Do you need the CSV file (to select individual tables with the setupCDC.sh configuration) on the master setup if you want all tables?

A: No.

Q: If you lose your slave down the road, do you need to re-provision from the initial SCN number or is there a way to start from a later point?

A: This is the reason for the THL Sequence Number introduced in the extractor. If you lose your slave, you can install a new slave and have it start at the transaction number where the failed slave stopped if you know it, since the information will be in the THL. If not, you can usually determine this by examining the THL directly. There should be no need to re-provision – just to restart from the transaction in the THL on the master.

Q: Regarding a failed slave, what if it failed such that we don’t have a backup or wanted to provision a second slave such that it had no initial data.

A: If you had no backups or data, yes, you would need to re-provision with the parallel extractor in order to seed the target database.

Q: Would you do that with the original SCN? If it had been a month or two, is there a way to start at a more recent SCN (e.g. you have to re-run the setupCDC process)?

A: The best case is to have two MySQL slaves and when one fails, you re-provision it from the healthy one. This avoids setupCDC stage.

However, the replication can always be started from a specific event (SCN) provided that SCN is available in the Oracle undo log space.

Q: How does Tungsten handle Oracle’s CLOB and BLOB data types

A: Providing you are using asynchronous CDC these types are supported; for synchronous CDC these types are not supported by Oracle.

Q: Can different schemas in Oracle be replicated at different times?

A: Each schema is extracted by a separate service in Replicator, so they are independent.

Q: What is the size limit for BLOB or CLOB column data types?

A: This depends on the CDC capabilities in Oracle, and is not limited within Tungsten Replicator. You may want to refer to the Oracle Docs for more information on CDC: http://docs.oracle.com/cd/B28359_01/server.111/b28313/cdc.htm

Q: With different versions of Oracle e.g. enterprise edition and standard edition one be considered heterogeneous environments?

A: Essentially yes, although the nomenclature is really only a categorization, it does not affect the operation, deployment or functionality of the replicator. All these features are part of the open source product.

Q: Can a 10g database (master) send the data to a 11g database (slave) for use in an upgrade?

A: Yes.

Q: Does the Oracle replicator require the Oracle database to be in archive mode?

A: Yes. This is a requirement for Oracle’s CDC implementation.

Q: How will be able to revisit this recorded webinar?

A: Slides and a recording from today’s webinar will be available at http://www.slideshare.net/Continuent_Tungsten

 

A New Home for Tungsten in the UK

I was suitably heartened to hear about the new mine opening up in the Devon here in the UK to mine the element Tungsten.

I comment on this to my associates at Continuent, where comments were made by Csaba as to the appropriate quotes in the article:

“Tungsten is an extraordinary metal.”

“It’s almost as hard as a diamond and has one of the highest melting points of any mineral.”

“Adding a small amount to steel makes it far harder, far more resistant to stress and heat. The benefits to industry are obvious.”

Leading to him to suggest Adding a small amount of Tungsten to MySQL makes it far harder, far more resistant to stress and failures. The benefits to industry are obvious.

I couldn’t possibly agree more!

 

Continuent at Hadoop Summit

I’m pleased to say that Continuent will be at the Hadoop Summit in San Jose next week (3-5 June). Sadly I will not be attending as I’m taking an exam next week, but my colleagues Robert Hodges, Eero Teerikorpi and Petri Versunen will be there to answer any questions you have about Continuent products, and, of course, Hadoop replication support built into Tungsten Replicator 3.0.

If you are at the conference, please go along and say hi to the team. And, as always, if there are any questions please let them or me know.

Real-Time Data Movement: The Key to Enabling Live Analytics With Hadoop

An article about moving data into Hadoop in real-time has just been published over at DBTA, written by me and my CEO Robert Hodges.

In the article I talk about one of the major issues for all people deploying databases in the modern heterogenous world – how do we move and migrate data effectively between entirely different database systems in a way that is efficient and usable. How do you get the data you need to the database you need it in. If your source is a transactional database, how does that data get moved into Hadoop in a way that makes the data usable to be queried by Hive, Impala or HBase?

You can read the full article here: Real-Time Data Movement: The Key to Enabling Live Analytics With Hadoop

 

Cross your Fingers for Tech14, see you at OSCON

i

So I’ve submitted my talks for the Tech14 UK Oracle User Group conference which is in Liverpool this year. I’m not going to give away the topics, but you can imagine they are going to be about data translation and movement and how to get your various databases talking together.

I can also say, after having seen other submissions for talks this year (as I’m helping to judge), that the conference is shaping up to be very interesting. There’s a good spread of different topics this year, but I know from having talked to the organisers that they are looking for more submissions in the areas of Operating Systems, Engineered Systems and Development (mobile and cloud).

If you’ve got a paper, presentation, or idea for one that you think would be useful, please go ahead and submit your idea.

I’m also pleased to say that I’ll be at OSCON in Oregon in July, handling a Birds of a Feather (BOF) session on the topic of exchanging data between MySQL, Oracle and Hadoop. I’ll be there with my good friend Eric Herman from Booking.com where we’ll be providing advice, guidance, experiences, and hoping to exchange more ideas, wishes and requirements for heterogeneous environments.

It’d be great to meet you if you want to come along to either conference.

 

 

Revisiting ZFS and MySQL

While at Percona Live this year I was reminded about ZFS and running MySQL on top of a ZFS-based storage platform.

Now I’m a big fan of ZFS (although sadly I don’t get to use it as much as I used to after I shutdown my home server farm), and I did a lot of different testing back while at MySQL to ensure that MySQL, InnoDB and ZFS worked correctly together.

Of course today we have a completely new range of ZFS compatible environments, not least of which are FreeBSD and ZFS on Linux, I think it’s time to revisit some of my original advice on using this combination.

Unfortunately the presentations and MySQL University sessions back then have all been taken down. But that doesn’t mean the advice is any less valid.

Some of the core advice for using InnoDB on ZFS:

  • Configure a single InnoDB tablespace, rather than configuring multiple tablespaces across different disks, and then let ZFS manage the underlying disk using stripes or mirrors or whatever configuration you want. This avoids you having to restart or reconfigure your tablespaces as your data grows, and moves that out to ZFS which can do it much more easily and while the filesystem and database remain online. That means we can do:
innodb_data_file_path = /zpool/data/ibdatafile:10G:autoextend
  • While we’re taking about the InnoDB data files, the best optimisation you can do is to set the ZFS block size to match the InnoDB block size. You should do this *before* you start writing data. That means creating the filesystem and then setting the block size:
zfs set recordsize=8K zpool/data
  • What you can also do is configure a separate filesystem for the InnoDB logs that has a ZPool record size of 128K. That’s less relevant in later versions of ZFS, but actually it does no harm.
  • Switch on I/O compression. Within ZFS this improves I/O time (because less data is read/written physically from/to disk), and therefore improves overall I/O times. The compression is good enough and passive to be able to handle the load while still reducing the overall time.
  • Disable the double-write buffer. The transactional nature of ZFS helps to ensure the validity of data written down to disk, so we don’t need two copies of the data to be written to ensure valid recovery in the case of failure that are normally caused by partial writes of the record data. The performance gain is small, but worth it.
  • Using direct IO (O_DIRECT in your my.cnf) also improves performance for similar reasons. We can be sure with direct writes in ZFS that the information is written down to the right place. EDIT: Thanks to Yves, this is not currently supported on Linux/ZFS right now.
  • Limit the Adjustable Replacement Cache (ARC); without doing this you can end up with ZFS using a lot of cache memory that will be better used at the database level for caching record information. We don’t need the block data cache as well.
  • Configure a separate ZFS Intent Log (ZIL), really a Separate Intent Log (SLOG) – if you are not using SSD throughout, this is a great place to use SSD to speed up your overall disk I/O performance. Using SLOG stores immediate writes out to SSD, enabling ZFS to manage the more effective block writes of information out to slower spinning disks. The real difference is that this lowers disk writes, lowers latency, and lowers overall spinning disk activity, meaning they will probably last longer, not to mention making your system quieter in the process. For the sake of $200 of SSD, you could double your performance and get an extra year or so out the disks.

Surprisingly not much has changed in these key rules, perhaps the biggest different is the change in price of SSD between when I wrote these original rules and today. SSD is cheap(er) today so that many people can afford SSD as their main disk, rather than their storage format, especially if you are building serious machines.