Data Migration: Understanding the Challenges

Data migration – that is, the practice of sharing and distributing information between databases – requires some very careful consideration. Are you moving the data permanently, temporarily, sharing it between applications? Do want to share all of it, some of it? Are you changing databases, or trying to move some data to access or use the data in a more efficient system?

Let’s start by looking at what we mean by a database, and what the myriad of different databases are out there.


 

Walk up to any person at an IT conference or gathering twenty five years ago and ask them to name a database most would have probably selected one of a couple of the available tools at the time. All of the databases would have been the same type. That type would have been some kind of fixed record database management system, along the lines of dBase III+ or Oracle.

These had some very specific layouts and formats – the record would have had a fixed size, based on fixed fields, often with fixed widths. The reasons for this were largely for technical reasons – the way to store data efficiently was in records of a fixed size. Each record was made up of fields, each with a fixed size. To read a record, you needed the definition and then just extracted the bytes, as shown in Figure 1-1.

Figure 1-1.png

Figure 1-1: Fixed Record and Field Sizes

To access a different record, you could ‘seek’ ahead in the file according to the size of the records, and the number of the record you wanted to update. For example, to read record number 15 you would skip forward by physically reading the bytes from a file at 14 x RECORDSIZE.bytes, reading RECORDSIZE bytes, and then extracting the field data using the known record structure. This meant that records were treated as one, big, long block of bytes, as shown here in Figure 1-2.

Figure 1-2.png

Figure 1-2: Fixed Records as a stream of data

In fact, this was a very simple data model that was (and still is) thoroughly practical – many young developers and programmers may well have created a database using this very model. It even works if you use indexes – you can point directly to a record using the same system.

It may surprise you to know that for some databases this is still the fundamental model at the lower levels, although there may be some additional complexities and features. But over those same 25 years some other things have changed in two different directions, data formats, and data diversity. Those two have lead to a level of complexity in terms of the database systems that manage.

Although it may be useful to understand these low-level data formats about how the data is actually physically stored by the database, the focus of this series is one level higher. We want to consider how the data is structured, fields, records, documents, and also about the formatting and character structures and information, and finally how the entire database appears and is usable within your chosen database system. More importantly, we want to know how to move it all elsewhere. Before we get there, let’s look at the top level, database types.

Database Types

My earliest database – at age eight – was one that I built to catalogue my book collection using my Sinclair ZX81, with the software written entirely in BASIC. By the time I was 13 I had started to build custom applications using dBase III+ to manage my fathers accounts. When I left college, my first job was to move data, first from an old Digital Unix system to the new Sun Solaris 2 using the same database, and then from that database engine called BRS/Search, to Oracle. BRS/Search was a completely free-form database.

The aim of this process was to move that free-form store into a structured format – Oracle, an RDBMS – and to access it using a front-end built using a Macintosh specific RDBMS engine called 4th Dimension. In the background, we also started putting different classes of data into the then-brand-new Macintosh specific database called Filemaker.

Since those early days I’ve worked with (and on) PostgreSQL, MySQL, Oracle, Microsoft SQL Server, Microsoft Access, CouchDB, Berkeley DB, SQLite, Couchbase, MongoDB, Cassandra, DB2, and most recently Hadoop, to name just a few. They all have different characteristics – this is the primary reason they exist at all, in fact – and capturing the essential essence of each group of databases is our first step on the road to understanding how to move data between these databases.

The point here is not that I’ve got experience of (although hopefully that helps explain the reason and experience behind the content here), but instead, to demonstrate that there is a huge array of choice out there today. They all have different parameters, different methods of storing data, different supported formats, and a huge array of methods for reading, querying and extracting the information.

But what exactly moves a collection of data from just that – a string of bytes – into a database? And how does affect how we move data between them? Let’s look at some basic database principles. This will not be new information, but they are vital concepts to understand so that we can translate and refer to these elements through the rest of the series.

Database Principles

What is a database?

That is not an innocent question, and the answer depends entirely on the database system, type and individual solution before you can really provide an answer.

However, it can be summed up in two sentences:

A database enables the storage of individual, addressable blocks of information to be stored efficiently. These blocks can also be retrieved and potentially searched and indexed to enable the information to be effectively retrieved.

Whenever you look at a database and how to store, retrieve and update the information, you need to consider how the information within the database is accessed.

All databases share the same basic principles when it comes to working with the information itself, they must all share the following functionality referred to as CRUD; Create, Read, Update, Delete:

  • Create – data must be able to be created within the database, and this can be done on record or block basis, or in a batch mode where data is created in bulk.
  • Read – data must be able to be read back out. By their very nature, all databases must be able to do this on a selective basis, either by record, or by a group of records. More complex databases enable you to achieve this more selectively, for example, by selecting all of cars that are blue, or all the invoices raised for Acme Inc.
  • Update – data must be able to be updated. Again, as with reading, this must be possible on a record by record basis. Updates may also involve bulk modification of multiple records and even multiple fields simultaneously.
  • Delete – data must be deletable or removable on a record by record basis, involving either single or multiple records simultaneously.

Understanding the significance of these different operations within different databases is important to getting the movement and migration of information correct. Some databases can, by design, only support certain levels of these operations. Some provide implicit and explicit deletion of records, and others may deliberately not support update operations.

To further complicate matters, performance should always be a consideration for certain types of data migration. Most analytical and data warehouse platforms benefit from large, batched, or combined updates. Hadoop, for example, works badly with a large number of small files, because these cannot easily be distributed across the cluster. Hadoop is also, by design, an append-only system, which means updates are more complex to handle.

Contrast this with Memcached, where bulk writes or updates are supported, but where for reasons of cache efficiency you do not want large batches of data to be updated simultaneously as it would invalidate large portions of the cache.

Data Formats

Different databases store and structure information differently. Some use records, some use fields, some use documents. Some expect data to be highly structured, where a single ‘database’ may consist of tens, hundreds or even thousands of different tables for different pieces and types of information. At the opposite end of the scale, some just have a record with no further classification or identification.

These principles and how to migrate between them will be discussed throughout the series, but some general principles about the different structures and how to move between them will be examined in closer detail in a future post, when we look at Data Mapping and Transformations.

Datatypes

Depending on the database in use, different databases may use or enforce specific datatypes on the data that is stored. For example, there may be both character (string) and numeric datatypes.Although it is possible to store numeric information into a string column, there are often benefits to the numerical identity, including more efficient storage (and therefore faster operation), and the ability to run or perform specific operations, such as a SUM() or AVERAGE() function on a numeric column without having to translate each individual string into an integer or floating-point value.

Datatypes and their identification and translation are a major focus of a future post on  Data Mapping and Transformations.

Indexes

All databases are predicated on the need to access the information within them very quickly. Consider a simple contact database with just 20 records in it. To look for the record with the name ‘MC Brown’ in it requires us to look at every record until we find the matching one. Of course, there may be more than one such record, so even if we find that the first record matches, we still have to iterate over 20 records to find all the matching entries.

With 20 records this isn’t a problem, with 20,000,000 records this is inefficient. Indexes bridge the gap by allowing the database to be addressed more efficiently. There are different algorithms for creating indexes that are beyond the scope of this text, but in all cases, the role of the index is to provide quicker access to information than could be achieved through a sequential sort.

Database Types

There are a myriad of different ways in which you can identify and classify different databases, and the dissection mechanism depends on what aspect of the database you are looking at. For example, SQL was for a long time associated exclusively with structured RDBMS engines, but has now become a data interface standard of it’s own and is used in both RDBMS and non-RDBMS environments. For the purposes of our understanding, we’ll examine them according to how they organise and classify their data.

Through the rest of this series, we concentrate on three major types, the RDBMS, NoSQL and Big Data.

Structured and Relational Database Management Systems (RDBMS)

Examples: Oracle, MySQL, PostgreSQL, Microsoft SQL Server, Microsoft Access, Filemaker Pro

Most structured database systems tend to have a relational database core (RDBMS), and most often, but not always, are interacted through the Structured Query Language (SQL). When talking to people about any databases, an RDBMS and SQL is what people will think of first, because it matches the idea of a strict database and types. The highly structured and rigid nature requires a rigid method of storing and retrieving information. It also places limitations and rigidity to your database types and structure. A simple layout is shown in Figure 1-3.

Figure 1-3.png

Figure 1-3: A structured RDBMS table diagram

Structured databases have a few specific characteristics:

  • Strict data structure – data is stored within fixed named silos (databases), within named tables, and with each table having a fixed number of named columns. Every single record within each table has the same number of fields (columns), and each column is used for a specific purpose or piece of information.
  • Strict data types – for example, an RDBMS will store integers and floats differently, and may have additional data types designed to provide fast access to specific information, for example, the SET and ENUM types within MySQL.
  • Data Definition Language (DDL) – related to the elements above, the DDL within any database is important because it provides a reference structure which can be used to replicate that structure in other database. Depending on the database system, the DDL may either be implicit in the way the data is accessed or stored, or in the API and interfaces provides, or the DDL could be more explicit, as in the dialects in SQL and similar statement-based interfaces.
  • Data manipulation language (DML) – Typically, but not always, SQL. The DML enables you to perform the correct CRUD operations to enable the information to be managed. Like DDL, the exact interface is very database specific. Some databases and systems rely entirely on a statement based language like SQL, which has it’s own dialects and structures for performing the updates. Others rely entirely on the API that interfaces between client applications and the database storage.
  • Relational capability – because the data is in a fixed format and with fixed types, it is possible to create specific relations between the field in one table with the field in other tables. This enables the data to be JOINed together to provide a unified output. For example, if you have orders and invoices, it’s possible to link the order and the invoice by a unique ID, and the database can either use or explicitly enforce the relationship. Joins are actually further characterised by their type, enabling many-to-one relationships (for example, multiple invoices relating to one client), one-to-many relationships (one invoice number referring to multiple invoice lines) and one-to-one (invoice to payment received).
  • Constraints and Indexes – constraints enable data to be created within a limited subset, or to identify rows uniquely. For example, a primary key constraint can force the table to create new records only with a new unique identifier. Indexes are used to create efficient methods for looking up and identifying data according to criteria. Within an RDBMS indexes are generally used to speed up access on a specific column, or multiple columns, to improve the speed of access during specific queries. Without an index, the RDBMS will default to performing a full table scan.

Structured/RDBMS solutions provide some of the easiest methods for exchanging data – it is generally easier to move data from a structure store to elsewhere. However, most destination databases do not have support the same range of indexes. Conversely, moving data from unstructured databases of any kind into Structured/RDBMS because you have to decide what goes where.

NewSQL Databases

Examples: Clustrix, VoltDB, InfiniDB, TokuDB

Traditional RDBMS and SQL databases are designed to run on a single machine. This has performance and hardware limitation issues. There is only so much memory and hard disk space that can be installed in a single machine, and if your database or performance requirements are high enough, a single server is not the solution. There are strategies, such as sharding the database (specifically splitting it up by an identifiable key, such as ID, name or geographical location), or more specifically dividing the database across machines, but these place a different load on your application layer, and are beyond the scope of this book.

NewSQL databases are a modification of the Structured/RDBMS that use multiple machines in a cluster to support the database requirements. Unlike the sharding and other methods, NewSQL solutions automatically distribute the load across the machines and handle the interface, indexing and querying required to access the data.

The main elements of the database and structure, such as databases, records and fields, and all other data migration considerations are the same as for traditional RDBMS environments.

NoSQL/Document Databases

Examples: Couchbase, CouchDB, MongoDB, Cassandra, HBase

NoSQL databases actually span a wide range of different databases, originally classified by their rejection of SQL as the DDL and DML language of choice, more usually resorting to the use of a direct API for accessing information. There was a resurgence of these different solutions in the early 2000s as people sought alternatives that were faster and simpler than the transactional RDBMS for web applications and websites.

Most NoSQL databases rely on simpler methods for accessing the information, for example by using a single document ID to retrieve a record of information. This document ID could be extracted from the users email address, so when a user logs in or register on a website, the document associated with that email address is accessed, rather than ‘looking-up’ the record in a larger table of user records.

NoSQL databases of this type can be roughly split into two groups, the columnar/tabular databases, and the document databases. The columnar/tabular type include Cassandra, Apache Hbase (part of Hadoop), and Google’s BigTable. Data is organised through an identifiable row ID, and a collection of associated column IDs that classify the data structure. They can look, and even act and operate in a similar fashion to the structured RDBMS table/row/column structure. A sample column style database (in this case Cassandra) looks roughly like that in Figure 1-4.

Figure 1-4.png

Figure 1-4: A columnar (Cassandra) database structure

Document databases are completely different. Unlike the table structure, data is instead organised into a document, usually using JSON or a JSON-like structure. Unlike the table structure, a document often combines different fragments of information together – for example, a contact record may store all the phone numbers, email addresses and other components within the single document for a given person. Documents, especially JSON based documents, are also very flexible and consist of fields that are nested, such as an array of phone numbers, or even entire nested structures, such as the individual rows (qty, product id, description, price) for an invoice or order, all encapsulated into a single document. A simple document database structure can be seen in Figure 1-5.

Figure 1-5.png

Figure 1-5: Document Databases

Perhaps most importantly, documents in a document database do not need to be identical. In a structured RDBMS environment, every record contains every field, even if the field is not actually used for that record. In a document database, different documents, even if within the same database or group may have only one field, or may have 20. The variable nature makes them appealing for this very reason, but represents an area of complexity when migrating information.

Most NoSQL systems have no idea of an explicit relation or join – this is often one of the aspects that makes the system faster. However, the lack of this element means that different techniques are required to store and interact with complex data.

Depending on the NoSQL solution, you may or may not have access to an index or quicker method of accessing the data. In CouchDB and Couchbase, for example, the fields of a document can be used to generate an index that provides quick searching and retrieval of information.

NoSQL databases can be easy to interact and migrate data to and from, providing there is (or isn’t) a strict schema, accordingly. For example, moving from an RDBMS to a document-based NoSQL database can be a case of converting the table records into documents identified by the primary key. It can also pay off in the long term to perform a more concerted conversion and translation of the source tables into unified documents.

Key/value (KV) Stores

Examples: Memcached, Redis, Riak

For most global declarations, key/value stores are treated as NoSQL, but I’ve split them out here because they have some interesting attributes that affect data exchange. A key/value store is exactly what it sounds like. A single blob of data (the value) is stored against a given key identifier. You store the information by giving the key, and retrieve the information by giving the same key. In most cases, the information can only be retrieved if you know the key. Iteration over the stored data, or indexes, are generally not available.

The roots of the key/value store go back to the attempt to speed up access to data where a given identifier is known, such as user id or email address. The best known key/value store is probably memcached which was originally developed to make use of the spare RAM of machines supporting a website (LiveJournal, a blogging platform) and enable fast access to blog entries. Since the ID of the blog could be derived from the URL being accessed, the entry could easily be looked up in memcached. If it didn’t exist, it was looked up from a MySQL database, and the formatted/retrieved version placed into the cache with the identifying URL.

Most document databases are really a modification of the key/value store. The value portion can be any data you like, from a simple string, through to a serialised object from C, Java or other languages, or a JSON document. In fact, some databases actually support both, and the only distinction between a key/value store and a document database is whether the database engine itself can identify and interact with the embedded structure. MongoDB and Couchbase, for example, have this distinction; MongoDB enables the database engine to update fields within the BSON (JSON-like) values, while Couchbase supports indexing of the JSON fields.

Key/Value stores are some of the harder databases to migrate and move data between. The lack of a structure, or the custom nature (for example a serialised language object), and the requirement to identify the record by a specific ID make exchanging data more complex.

Big Data (aka Unstructured, Semi-structured and Implied Structure Databases)

Examples: Hadoop, Apache Solr, ElasticSearch, Lucene

BRS/Search was, for the time and technology, relatively ground breaking in that it was a full-text retrieval system. Today we would probably classify this as a ‘document’ based database, that is, one that has a structured format, although the power behind BRS/Search was the ability to perform a free-text search across an entire collection.

Today, we generally referred to these types of database as unstructured, that is, there is no discernible format or structure to the information. Although there are many different examples of this, probably the best known today is Hadoop. Without getting into the functionality or history of Hadoop, the power of Hadoop comes from it’s ability to distribute the raw data and also to process and extract usable information from the unstructured data into something usable.

Within Hadoop, the normal workflow is to load Hadoop with raw data, for example, the text from tweets, or web-pages, and then use that information to build an index or data structure around the information so that it can be analysed or searched. Solutions such as Solr, Lucene and ElasticSearch work in similar ways, accessing the raw text and either indexing it so that the data can be indexed and searched, or using the structure that is available to provide searching and indexing by a more specific area.

This is an example where ‘semi-structured’ data applies. Twitter data for example consists of the twitter name, the tweet itself, and any tags or twitter users the tweet was directed to. The fixed fields and the tweet go together to make it semi-structured, as it consists of both structured and free-form information.

Implied structure databases are those where the structure of the data is implied by the database, even though the underlying data may only be partially structured and described. Apache Hive, part of Hadoop, is an example of this. Hive can natively read text files and interpret them with a specific structure, converting CSV files into columns so that they can be queried by HiveQL, a simplified form of SQL. Hive can also parse more complex data, including CSV that embeds JSON and serialised data structures, all so they can be queried through a familiar interface.

However, unlike a true RDBMS, Hive only interprets the underlying format, and it performs this interpretation every time the data is accessed. At no time does the data have to be translated into Hive format (nor, really, is there one), and no indexes are created to enable quick access to the data.

All of these individual types are wrapped up into what I’ve classed as ‘Big Data’. This is not to say that the data needs to be of specific size or complexity, only that it may consist of structured, unstructured, or all variants in between.

Moving data to and from unstructured, semi-structured, and implied structure databases entirely depends on what the information is, what structure is available, and how that structure can be used (or ignored) accordingly.

Process home monitoring data using the Time Series Database in Bluemix

I keep a lot of information about my house – I have had sensors and recording units in various parts of my house years, recording info through a variety of different devices.

Over the years I’ve built a number of different solutions for storing and displaying the information, and when the opportunity came up to write about a database built specifically for recording this information I jumped at the change, and this is what I came up with:

As home automation increases, so does the number of sensors recording statistics and information needed to feed that data. Using the Time Series Database in BlueMix makes it easy to record the time-logged data and query and report on it. In this tutorial, we’ll examine how to create, store, and, ultimately, report on information by using the Time Series Database. We’ll also use the database to correlate data points across multiple sensors to track the effectiveness of heating systems in a multi-zone house.

You can read the full article here

Real-Time Data Movement: The Key to Enabling Live Analytics With Hadoop

An article about moving data into Hadoop in real-time has just been published over at DBTA, written by me and my CEO Robert Hodges.

In the article I talk about one of the major issues for all people deploying databases in the modern heterogenous world – how do we move and migrate data effectively between entirely different database systems in a way that is efficient and usable. How do you get the data you need to the database you need it in. If your source is a transactional database, how does that data get moved into Hadoop in a way that makes the data usable to be queried by Hive, Impala or HBase?

You can read the full article here: Real-Time Data Movement: The Key to Enabling Live Analytics With Hadoop

 

Harvest machine data using Hadoop and Hive

A new article on has been published on IBM developerWorks, looking at the basics of processing machine data using Hadoop, from extracting the core data, storing it, and then determining the baselines and trigger points required to identifying worrying trends and points. From the intro:

Machine data can come in many different formats and quantities. Weather sensors, fitness trackers, and even air-conditioning units produce massive amounts of data, which begs for a big data solution. But how do you decide what data is important, and how do you determine what proportion of that information is valid, worth including in reports, or valuable in detecting alert situations? This article covers some of the challenges and solutions for supporting the consumption of massive machine data sets that use big data technology and Hadoop.

Harvest machine data using Hadoop and Hive.

MySQL to Hadoop Step-By-Step

We had a great webinar on Thursday about replicating from MySQL to Hadoop (watch the whole thing). It was great, but one of the questions at the end was ‘is there an easy way to test’.

Sadly we can’t go giving out convenient ready-to-run downloads of these things because of licensing and and other complexities, so I want to try and make it as simple and straightforward as possible by giving you the directions to complete. I’m going to be point to the Continuent Documentation every now and then so this is not too crowded, but we should get through it pretty easily.

Major Decisions

For this to work: 

  • We’ll setup two VMs, one the master (running MySQL), the other the slave (Running Cloudera)
  • The two VMs must be able to reach each other on the network. It doesn’t matter whether they are running Internal, NAT, or bridge-mode network, they just need to be able to ping and SSH each other. Switch off firewalls to prevent port weirdness.
  • For convenience, update your /etc/hosts to have a host1 (the master) and host2 (the slave)
  • The master must have followed the prereqs; for the slave it’s optional, but highly recommended

With that in mind, let’s get started.

Step 1: Setup your Master Host

There are a number of ways you can do this. If you want to simplify things and have VirtualBox, try downloading this VM. It’s a 1.5GB download containing and OVF VM, and is a Ubuntu host, with our prerequisites followed. To use this :

  1. Uncompress the package.
  2. Import the VM into your VirtualBox.
  3. If you want, change the network type from Internal to a bridged or NAT environment.

Using internal networking, you can login to this using:

shell> ssh -p2222 tungsten@localhost

Passwords are ‘password’ for tungsten and root.

If you don’t want to follow this but want your own VM:

  1. Create a new VM with 1-2GB of RAM, and 8GB or more of disk space
  2. Install your OS of choice, either Ubuntu or CentOS
  3. Follow our prerequisite instructions
  4. Make sure MySQL is setup and running, and that the binary logging is enabled
  5. Make sure it has a valid IP address

Step 2: Setup Tungsten Replicator

Download the latest Tungsten Replicator binary from this page

Unpack the file:

shell> tar zxf tungsten-replicator-3.0.tar.gz

Change into the directory:

shell> cd tungsten-replicator-3.0

Create a new replicator installation, this will read from the binary log into THL:

shell> ./tools/tpm install alpha \
--install-directory=/opt/continuent \
--master=host1 \
--members=host1 \
--java-file-encoding=UTF8 \
--java-user-timezone=GMT \
--mysql-enable-enumtostring=true \
--mysql-enable-settostring=true \
--mysql-use-bytes-for-string=false \
--svc-extractor-filters=colnames,pkey \
--property=replicator.filter.pkey.addColumnsToDeletes=true \
--property=replicator.filter.pkey.addPkeyToInserts=true \
--replication-password=password \
--replication-user=tungsten \
--skip-validation-check=HostsFileCheck \
--skip-validation-check=ReplicationServicePipelines \
--start-and-report=true

For a full description of what’s going on here, see this page and click on the magnifying glass. You’ll get the full description of each option.

To make sure everything is OK, you should get a status from trepctl generated. If it’s running and it shows the status as online, we’re ready.

Step 3: Get your Cloudera Host Ready

There are lots of ways to get Cloudera’s Hadoop solution installed. The ready-to-run VM is the simplest by far.

  1. Download the Cloudera VM quick start host from here; there are versions for VirtualBox and VMware and KVM.
  2. Set the networking type to match the master.
  3. Start the host
  4. Set the hostname to host2
  5. Update the networking to an IP address that can talk to the master.
  6. Update /etc/hosts to add the IP address of host1 and host2 e.g.:

192.168.0.2 host1

Add a ‘tungsten’ user which we will use to install Tungsten Replicator.

Step 4: Install your Hadoop Slave

Download the latest Tungsten Replicator binary from this page

Unpack the file:

shell> tar zxf tungsten-replicator-3.0.tar.gz

Change into the directory:

shell> cd tungsten-replicator-3.0

Create a new replicator installation, this will read the information from the master (host1) and apply it to this host (host2)

shell> ./tools/tpm install alpha \
--batch-enabled=true \
--batch-load-language=js \
--batch-load-template=hadoop \
--datasource-type=file \
--install-directory=/opt/continuent \
--java-file-encoding=UTF8 \
--java-user-timezone=GMT \
--master=host1 \
--members=host2 \
'--property=replicator.datasource.applier.csv.fieldSeparator=\\u0001' \
--property=replicator.datasource.applier.csv.useQuotes=false \
--property=replicator.stage.q-to-dbms.blockCommitInterval=1s \
--property=replicator.stage.q-to-dbms.blockCommitRowCount=1000 \
--replication-password=secret \
--replication-user=tungsten \
--skip-validation-check=DatasourceDBPort \
--skip-validation-check=DirectDatasourceDBPort \
--skip-validation-check=HostsFileCheck \
--skip-validation-check=InstallerMasterSlaveCheck \
--skip-validation-check=ReplicationServicePipelines \
--start-and-report=true

For a description of the options, visit this page and click on the second magnifying glass to get the description.

As before, we want everything to be running and for the replicator to be online, run:

shell> trepctl status

This should tell you everything is running – if you get an error about this not being found, source the environment to populate your PATH correctly:

shell> source /opt/continuent/share/env.sh

We want everything to be online and running. If it isn’t, use the docs to help determine the reason, or use our discussion group to ask questions.

Step 5: Generating DDL

For your chosen MySQL database schema, you need to generate the staging and live table definitions for Hive.

A tool, ddlscan, is provided for this. You need to run it and provide the JDBC connect string for your database, and your user and password. If you followed the prereqs, use the one for the tungsten user.

First create the live table DDL:

shell> ddlscan -user tungsten -url 'jdbc:mysql://host1:3306/test' -pass password -template ddl-mysql-hive-0.10.vm -db test > schema.sql

Now apply it to Hive:

shell> cat schema.sql | hive

To create Hive tables that read the staging files loaded by the replicator, use the ddl-mysql-hive-0.10-staging.vm template on the same database:

shell> ddlscan -user tungsten -url 'jdbc:mysql://host:3306/test' -pass password -template ddl-mysql-hive-0.10-staging.vm -db test > schema-staging.sql

Now apply it to Hive again:

shell> cat schema-staging.sql | hive

Step 6: Start Writing Data

Hopefully by this point you’ve got two VMs, one running MySQL and the master replicator extracting info from the MySQL binary log. On the other, you have a basic Cloudera instance with a slave replicator writing changes. Both replicator should be online (use ‘trepctl status’ to check).

All you need to do is start writing data into the tables you selected when creating the DDL. That should be it – you should see data start to stream into Hadoop.

Real-Time Replication from MySQL to Cassandra

Earlier this month I blogged about our new Hadoop applier, I published the docs for that this week (http://docs.continuent.com/tungsten-replicator-3.0/deployment-hadoop.html) as part of the Tungsten Replicator 3.0 documentation (http://docs.continuent.com/tungsten-replicator-3.0/index.html). It contains some additional interesting nuggets that will appear in future blog posts.

The main part of that functionality that performs the actual applier for Hadoop is based around a JavaScript applier engine – there will eventually be docs for that as part of the Batch Applier content (http://docs.continuent.com/tungsten-replicator-3.0/deployment-batchloading.html). The core of this system is that it    takes the information from the data stream of the THL and the CSV file that was written by the batch applier system, and runs the commands necessary to load it into Hadoop and perform any necessary merges.

I wanted to see how easy it would be to use the same system to use that same flexible system and bend it to another database system, in my case, I chose Cassandra.

For the record, it took me a couple of hours to have this working, and I’m guessing another hour will file down some of the rough edges.

Cassandra is interesting as a database because it mixes a big distributed key/value store with a close-enough to SQL like interface in the form of CQL. And that means we can make use of the CQL to help us perform the merging into the final tables in a manner not dissimilar to the method we use for loading into Vertica.

Back to the Javascript batch loader, the applier provides five different implementable functions (all are technically optional) that you can use at different stages of the applier process. These are:

  • prepare() – called once when the applier goes online and can be used to create temporary directories or spaces
  • begin() – called at the start of each transaction
  • apply() – called at the end of the transaction once the data file has been written, but before the commit
  • commit() – called after each transaction commit has taken place; this where we can consolidate info.
  • release() – called when the applier goes offline

We can actually align these functions with a typical transaction – prepare() happens before the statements even start, begin() is the same as BEGIN, apply() happens immediately before COMMIT and commit() happens just after. release() can be used to do any clean up afterwards.

So let’s put this into practice and use it for Cassandra.

The basic process for loading is as follows:

  1. Write a CSV file to load into Cassandra
  2. Load the CSV file into a staging table within Cassandra; this is easy through CQL using the ‘COPY tablename FROM filename’ CQL statement.
  3. Merge the staging table data with a live table to create a carbon copy of our MySQL table content.

For the loading portion, what we’ll do is load the CSV into a staging table, and then we’ll merge the staging table and live table data together during the commit stage of our batch applier. We’ll return to this in more detail.

For the merging, we’ll take the information from the staging table, which includes the sequence number and operation type, and then write the ‘latest’ version of that row and put it into the live table. That gives us a structure like this:

Cassandra Loader

Tungsten Replicator is going to manage this entire process for us – all we need to do ins install the replicators, plug in these custom bits, and let it run.

As with the Hadoop applier, what we’re going to do is use the batch applier to generate only insert and delete rows; UPDATE statements will be converted into a delete of the original version and insert of the new version. So:

INSERT INTO sample VALUES (1,’Message’)

Is an insert…

DELETE sample WHERE id  = 1

Is a delete, and:

UPDATE sample SET message = ’Now you see me’ WHERE id = 1

is actually:

DELETE sample WHERE id  = 1
 INSERT INTO sample VALUES (1,’Now you see me’)

This gets round the problem of doing updates (which in big data stores are expensive, particularly Hadoop which doesn’t support updating existing data), into a more efficient delete and insert.

In the CSV data itself, this is represented by prefix every row with three fields:

optype, sequence number, unique id

Optype is ‘D’ for a delete and ‘I’ for an insert and is used to identify what needs to be done. The sequence number is the unique transaction ID from the replicator THL. This number increases by one for every transaction, and this means we can always identify the ‘latest’ version of a row, which is important to us when processing the transaction into Cassandra. the unique ID is the primary key (or compound key) from the source data. We need this to ensure we update the right row. To replicate data in this way, we must have a primary key on the data. If you don’t have primary keys, you are probably in a world of hurt anyway, so it shouldn’t be a stretch.

One difficulty here is that we need to cope with an idiosyncracy of Cassandra, which is that by default, Cassandra orders fields in the ‘tables’ (really collections of key/values) so that integers and numbers appear first in the table, and text appears last. This is an optimisation that Cassandra makes that complicates things for us, but only in a very small way. For the moment, we’ll handle it by assuming that we are loading only one table with a known format into Cassandra. We could handle multiple tables by using a simple IF statement in the JS and using different formats for that, or we could actually extract the info from the incoming data; I’m going to skip that because it keeps us away from the cool element of actually getting the data in.

Within Cassandra then we have two tables, the table we are loading data into, and the staging table that we load the CSV data into. For our sample, the live schema is ‘sample’, the live table is ‘sample’ and the staging table is ‘staging_sample’.

The definitions for these in Cassandra are for the sample live table:

 CREATE TABLE sample (
 id int,
 message text,
 PRIMARY KEY (id)
 ) WITH
 bloom_filter_fp_chance=0.010000 AND
 caching='KEYS_ONLY' AND
 comment='' AND
 dclocal_read_repair_chance=0.000000 AND
 gc_grace_seconds=864000 AND
 index_interval=128 AND
 read_repair_chance=0.100000 AND
 replicate_on_write='true' AND
 populate_io_cache_on_flush='false' AND
 default_time_to_live=0 AND
 speculative_retry='99.0PERCENTILE' AND
 memtable_flush_period_in_ms=0 AND
 compaction={'class': 'SizeTieredCompactionStrategy'} AND
 compression={'sstable_compression': 'LZ4Compressor'};

And for the staging_sample table:

CREATE TABLE staging_sample (
 optype text,
 seqno int,
 fragno int,
 id int,
 message text,
 PRIMARY KEY (optype, seqno, fragno, id)
 ) WITH
 bloom_filter_fp_chance=0.010000 AND
 caching='KEYS_ONLY' AND
 comment='' AND
 dclocal_read_repair_chance=0.000000 AND
 gc_grace_seconds=864000 AND
 index_interval=128 AND
 read_repair_chance=0.100000 AND
 replicate_on_write='true' AND
 populate_io_cache_on_flush='false' AND
 default_time_to_live=0 AND
 speculative_retry='99.0PERCENTILE' AND
 memtable_flush_period_in_ms=0 AND
 compaction={'class': 'SizeTieredCompactionStrategy'} AND
 compression={'sstable_compression': 'LZ4Compressor'};

I’ve put both tables into a ‘sample’ collection.

Remember that that idiosyncrasy I mentioned? Here it is, a bare table loading from CSV will actually order the data as:

seqno,uniqno,id,optype,message

This is Cassandra’s way of optimising integers over text to speed up lookups, but for us is a minor niggle. Right now, I’m going to handle it by assuming we are replicating only one schema/table and we we not what the structure of that looks like. Longer term, I want to pull it out of the metadata, but that’s a refinement.

So let’s start by having a look at the basic JS loader script, it’s really the component that is going to handle the core element of the work, managing the CSV files that come in from the batch engine and applying them into Cassandra. Remember, there are five functions that we can define, but for the purposes of this demonstration we’re going to use only two of them, apply(), which will load the CSV file into Cassandra, and the commit() function, which will perform the steps to merge the stage data.

The apply() function does two things, it identifies the table and schema, and then runs the command to load this data into Cassandra through the cqlsh command-line tool. We actually can’t run CQL directly from this command line, but I wrote a quick shell script that pipes CQL from the command-line into a running cqlsh.

The commit() function on the other hand is simpler, although it does a much more complicated job using another external script, this time written in Ruby.

So this gives us a cassandra.js script for the batch applier that looks like this:

function apply(csvinfo)
{
   sqlParams = csvinfo.getSqlParameters();
   csv_file = sqlParams.get("%%CSV_FILE%%");
   schema = csvinfo.schema;
   table = csvinfo.table;
  runtime.exec("/opt/continuent/share/applycqlsh.sh " + schema + ' "copy staging_' + table + " (optype,seqno,uniqno,id,message) from '" + csv_file + "';\"");
}

function commit()
{
  runtime.exec("/opt/continuent/share/merge.rb " + schema);
}

So, the apply() function is called for each event as written into the THL from the MySQL binary log, and the content of the CSV file generated at that point contains the contents of the THL event; if it’s one row, it’s a one-row CSV file; if it’s a statement or transaction that created 2000 rows, it’s a 2000 row CSV file.

The csvinfo object that is provided contains information about the batch file that is written, including, as you can see here, the schema and table names, and the sequence number. Note that we could, at this point, pull out table info, but we’re going to concentrate on pulling a single table here just for demo purposes.

The CQL for loading the CSV data is:

COPY staging_tablename (optype,seqno,uniqno,id,message) from ‘FILENAME’;

This says, copy the the specific columns in this order from the file into the specified table.  As I mentioned, currently this is hard coded into the applier JS, but would be easy to handle for more complex schemas and structures.

The commit() function is even simpler, because it just calls a script that will do the merging for us – we’ll get to that in a minute.

So here’s the script that applies an arbitrary CQL statement into Cassandra:

 #!/bin/bash
SCHEMA=$1;shift
echo "$*" |cqlsh -k $SCHEMA tr-cassandra2

Really simple, but gets round a simple issue.

The script that does the merge work is more complex; in other environments we might be able to do this all within SQL, but CQL is fairly limited with no sub-queries. So we do it long-hand using Ruby. The basic sequence is quite simple, and is in two phases:

  1. Delete every row mentioned in the staging table with an optype of D with a matching unique key
  2. Insert the *last* version of an insert for each unique ID – the last version will be the latest one in the output. We can pick this out by just iterating over every insert and picking the one with the highest Sequence number as generated by the THL transaction ID.
  3. Delete the content from the staging table because we’ve finished with it. That empties the staging table ready for the next set of transactions.

That file looks like this:

#!/usr/bin/ruby

require 'cql'

client = Cql::Client.connect(hosts: ['192.168.1.51'])
client.use('sample')

rows = client.execute("SELECT id FROM staging_sample where optype = 'D'")

deleteids = Array.new()

rows.each do |row|
puts "Found ID #{row['id']} has to be deleted"
deleteids.push(row['id'])
end

deleteidlist = deleteids.join(",")

client.execute("delete from sample where id in (#{deleteidlist})");
puts("delete from sample where id in (#{deleteidlist})");
rows = client.execute("SELECT * FROM staging_sample where optype = 'I'");

updateids = Hash.new()
updatedata = Hash.new()

rows.each do |row|
id = row['id']
puts "Found ID #{id} seq #{row['seqno']} has to be inserted"
if updateids[id]
if updateids[id] < row['seqno']
updateids[id] = row['seqno']
row.delete('seqno')
row.delete('fragno')
row.delete('optype')
updatedata[id] = row
end
else
updateids[id] = row['seqno']
row.delete('seqno')
row.delete('fragno')
row.delete('optype')
updatedata[id] = row
end
end

updatedata.each do |rowid,rowdata|
puts "Should update #{rowdata['id']} with #{rowdata['message']}"
collist = rowdata.keys.join(',')
colcount = rowdata.keys.length
substbase = Array.new()
#  (1..colcount).each {substbase.push('?')}
rowdata.values.each do |value|
if value.is_a?(String)
substbase.push("'" + value.to_s + "'")
else
substbase.push(value)
end
end

substlist = substbase.join(',')

puts('Column list: ',collist)
puts('Subst list: ',substlist)
cqlinsert = "insert into sample ("+collist+") values ("+substlist+")"
puts("Statement: " + cqlinsert)
client.execute(cqlinsert)
end

client.execute("delete from staging_sample where optype in ('D','I')")

Again, currently, this is hard coded, but I could easily of got the schema/table name from the JS batch applier – the actual code is table agnostic and will work with any table.

So, I’ve setup two replicators – one uses the cassandra.js rather than hadoop.js but works the same way, and copied the applycqlsh.sh and merge.rb into /opt/continuent/share.

And we’re ready to run. Let’s try it:

mysql> insert into sample values (0,'First Message’);
Query OK, 1 row affected (0.01 sec)

We’ve inserted one row. Let’s go check Cassandra:

cqlsh:sample> select * from sample;

id  | message
-----+---------------
489 | First Message

Woohoo – data from MySQL straight into Cassandra.

Now let’s try updating it:

mysql> update sample set message = 'Updated Message' where id = 489;
Query OK, 1 row affected (0.01 sec)
Rows matched: 1  Changed: 1  Warnings: 0

And in Cassandra:

cqlsh:sample> select * from sample;

id  | message
-----+-----------------
489 | Updated Message

Bigger woohoo. Not only am I loading data directly into Cassandra, but I can update it as well. Now I can have a stream of update and information within MySQL replicated over to Cassandra for whatever analysis or information that I need without any issues.

Cool huh? I certainly think so (OK, but I’m biased).

Now I haven’t tested it, but this should just as easily work from Oracle; I’ll be testing that and let you know.

Any other database destinations people would like to see for replicating into? If so, let me know and I’ll see what I can do.

Process complex text for information mining

My latest article on data mining text information is now available:

Text — an everyday component of nearly all social interaction, social networks, and social sites — is difficult to process. Even the basic task of picking out specific words, phrases, or ideas is challenging. String searches and regex tools don\’t suffice. But the Annotation Query Language (AQL) within IBM InfoSphere® BigInsights™ enables you to make simple and straightforward declarative statements about text and convert that into easily manageable data chunks. Learn how AQL and InfoSphere BigInsights can process text into meaningful data and find out how to convert that information into something usable within the BigSheets environment to get statistical and visualized data from the raw material.

Read Process complex text for information mining.

Building flexible apps from big data sources

My article on how to build flexible apps on top of the BigInsights platform has been published. This demonstrates a cool way to combine some client-end JavaScript and existing technologies to build a Big Data query interface without developing a specialised application for the purpose.

It’s no secret that a significant proportion of the needs for big data have come from the explosion in Internet technologies. Up until 10-20 years ago, the idea of a public-facing application having more than a few million users was unheard of. Today, even a modest website can have millions of users, and if it’s active, can generate millions of data items every day. The irony is that the very infrastructure and systems that create big data can also work in reverse, and provide some of the better ways to integrate and work with that data. Usefully, InfoSphere® BigInsights™ comes with support for managing and executing data jobs through a simple REST API. And through the Jaql interface, we can run queries and get information directly from a Hadoop cluster. This article looks at how these systems work together to give you a rich basis for capturing data and provide an interface to get the information back out again.

Building flexible apps from big data sources.

Process big data with Big SQL in InfoSphere BigInsights

The ability to write an SQL statement against your Big Data stored in Hadoop provides some much needed flexibility. Sure, using Hive or HBase you can perform some of those operations, but there are other alternatives that may suit your needs better, such as the Big SQL utility. My latest article on this tool is provided here:

SQL is a practical querying language, but is has limitations. Big SQL enables you to run complex queries on non-tabular data and query it with an SQL-like language. The difference with Big SQL is that you are accessing data that may be non-tabular, and may in fact not be based upon a typical SQL database structure. Using Big SQL, you can import and process large volume data sets, including by taking the processed output of other processing jobs within InfoSphere BigInsights™ to turn that information into easily query-able data. In this article, we look at how you can replace your existing infrastructure and queries with Big SQL, and how to take more complex queries and convert them to make use of your Big SQL environment.

Process big data with Big SQL in InfoSphere BigInsights.