Friday, 18 September 2009

Trying out mapreduce - on the farm

Received an email this week from Sanger helpdesk that they installed a test hadoop system on the farm with 2 nodes. Thanks guys! First thing to do, obviously, was to repeat the streaming mapreduce exercise I did on my own machine (see my previous post). Only difference with my local setup is that this time I had to handle HDFS.

As a recap from my previous post: I'll be running an equivalent of the following:

cat snps.txt | ruby snp_mapper.rb | sort | ruby snp_reducer.rb

Setting up
First thing the Sanger wiki told me was to format my HDFS space:
hadoop namenode -format
This apparently only affects my own space... After that I could start playing with hadoop.

Where am I?
It looks like hadoop installs its own complete filesystem: even if my path on the server would be /home/users/a/aerts, a
hadoop fs -lsr /
shows that in the HDFS system I'm at /user/aerts.

Preparing the run
First off: create a directory to work in. Let's call that "locustree" with two subdirectories, called "input" and "output".
hadoop fs -mkdir locustree
hadoop fs -mkdir locustree/input
hadoop fs -mkdir locustree/output
And copy your datafile to the input directory:
hadoop fs -put snps.txt locustree/input/
Running the run
Hadoop documentation mentions that any shebang line in the mapper and reducer scripts are likely to not work, so you have to call ruby explicitely. Provide both scripts as "-file" arguments to hadoop. Finally, from your local directory containing the scripts, run the following command to run the mapreduce job:
hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-0.20.1-streaming.jar \
-input locustree/input/snps.txt \
-mapper "/usr/local/bin/ruby snp_mapper.rb" \
-reducer "/usr/local/bin/ruby snp_reducer.rb" \
-output locustree/output/snp_index \
-file snp_mapper.rb \
-file snp_reducer.rb

Et voila: a new directory snp_index now contains the file spit out by the snp_reducer.rb script.

Wednesday, 2 September 2009

Trying out mapreduce

learning to map/reduce
Photo by niv available from Flickr

I have long been interested in trying out mapreduce in my data pipelines. The Wellcome Trust Sanger Institute has several huge compute farms that I normally use, but they don't support mapreduce jobs. Quite understandable from the IT's and institute's point of view because it's a mammoth task to keep those things running. But it also means that I can't put a foot on the mapreduce path.

There are options to run mapreduce on your own, however. One is to install hadoop locally with the restriction that you can only use one node: your own machine. This walkthrough will get you started with that. Another approach is to use Amazon Elastic MapReduce (AWS EMR).

First of, I must admit that I'm far from familiar with mapreduce at this point in time. As far as I understand it's ideal for aggregating data when the algorithm can be run on a subset of the data as well as the complete dataset. The mapping step of the framework will create different subsets of that data, run the required program on it and return partial results. These results should be in the form of key/value-pairs and will serve as the input for a reduce step that combines all intermediate results into one.

What's held me back from using AWS EMR is the cost. It's practically nothing, but anything I run on their servers at this moment is paid for with my own money. So today I installed hadoop locally and started coding.

The pilot
One of the projects I'm working on is LocusTree: a way of indexing genomic features that allows for quick visualization at different resolutions. In this data structure the genome is first divided into bins of 4,000 bp (called level 0). The next step sees the creation of new "parent" bins that each hold 2 of the original ones (so each covering 8,000 bp; called level 1). You keep on grouping 2 bins into a parent bin until you end up with a single bin for a complete chromosome. For chromosome 1 that means 17 levels, I think.

Each of the features - for example a list of 15 million SNPs or log2ratios for 42 million probes - is put in the smallest node in which it fits entirely. A feature ranging from position 10 to 20 will fit in the first bin on level 0; but a feature from position 3,990 to 4,050 will have to go in the first bin on level 1. Subsequently a count and if applicable some aggregate value like sum, min or max is stored in that node as well as in every parent node.


So for each SNP/probe the indexing script has to find the smallest enclosing node and update the aggregate values for each parent up to level 3.

The setup
I'm not good at java so used the hadoop streaming approach for this indexing. The input file contains about 15 million SNPs and looks like this:

snp6216929   3   177718011   177718021
snp6216930 3 177718267 177718277
snp6216931 3 177718268 177718278
snp6216932 3 177718294 177718304
snp6216933 3 177718612 177718622
snp6216934 3 177718629 177718639
snp6216935 3 177718956 177718966
snp6216936 3 177719529 177719539
snp6216937 3 177719623 177719633
I want to get a list of nodes with the count of SNPs within that node and all child-nodes. Each node should also list the SNP names for which that node is the smallest enclosing node. So the output looks like:
3:0:44429   9   snp6216929,snp6216930,snp6216931,snp6216932,snp6216933,snp6216934,snp6216935,snp6216936,snp6216937
3:0:44430 1 snp6216938
3:1:22214 9
3:1:22215 1
3:2:11107 10
3:3:5553 10
3:4:2776 10
3:5:1388 10
3:6:694 10
3:7:347 10
3:8:173 10
3:9:86 10
3:10:43 10
3:11:21 10
3:12:10 10
3:13:5 10
3:14:2 10
3:15:1 10
3:16:0 10
3:17:0 10
First column is the node (chromosome - level number - start position), second column is the number of SNPs covered by that node, and third column lists the SNP names for the smallest enclosing nodes.

The scripts
Code for the mapper:
#!/usr/bin/ruby
CHROMOSOME_LENGTHS = {'1' => 247249719,
'2' => 242951149,
'3' => 199501827,
'4' => 191273063,
'5' => 180857866,
'6' => 170899992,
'7' => 158821424,
'8' => 146274826,
'9' => 140273252,
'10' => 135374737,
'11' => 134452384,
'12' => 132349534,
'13' => 114142980,
'14' => 106368585,
'15' => 100338915,
'16' => 88827254,
'17' => 78774742,
'18' => 76117153,
'19' => 63811651,
'20' => 62435964,
'21' => 46944323,
'22' => 49691432,
'23' => 154913754,
'24' => 57772954
}

def enclosing_node(chr,start,stop)
start = start.to_i
stop = stop.to_i
level_number = ((Math.log(CHROMOSOME_LENGTHS[chr]) - Math.log(4000)).to_f/Math.log(2)).floor + 1
resolution_at_level = 4000*(2**level_number)
previous_start_bin = 0
previous_level_number = level_number
start_bin = (start-1).div(resolution_at_level)
stop_bin = (stop-1).div(resolution_at_level)

ancestors = Array.new
while start_bin == stop_bin and level_number >= 0
ancestors.push([chr, level_number + 1, previous_start_bin].join(":"))
previous_start_bin = start_bin
previous_level_number = level_number
level_number -= 1
resolution_at_level = 4000*(2**level_number)
start_bin = (start-1).div(resolution_at_level)
stop_bin = (stop-1).div(resolution_at_level)
end
return [[chr, level_number + 1, previous_start_bin].join(":"), ancestors.reverse].flatten
end

STDIN.each do |line|
name, chr, start, stop = line.chomp.split(/\t/)

encl_node, *ancestors = enclosing_node(chr,start,stop)
STDOUT.puts [encl_node, 1, name].join("\t")
ancestors.each do |node|
STDOUT.puts [node, 1].join("\t")
end
end
Don't worry about the details. It basically takes the input from standard input and writes this to standard output:
3:0:44429   1   snp6216929
3:1:22214 1
3:2:11107 1
3:3:5553 1
3:4:2776 1
3:5:1388 1
3:6:694 1
3:7:347 1
3:8:173 1
3:9:86 1
3:10:43 1
3:11:21 1
3:12:10 1
3:13:5 1
3:14:2 1
3:15:1 1
3:16:0 1
3:17:0 1
3:0:44429 1 snp6216930
3:1:22214 1
3:2:11107 1
3:3:5553 1
...
The reducer script takes all this and summarizes it all:
#!/usr/bin/ruby
nodes = Hash.new
STDIN.each do |line|
node, count, ids = line.chomp.split(/\t/)
parsed_ids = ids.split(/,/) unless ids.nil?
if nodes.has_key?(node)
nodes[node][:count] += count.to_i
nodes[node][:accessions].push(parsed_ids) unless ids.nil?
else
nodes[node] = Hash.new
nodes[node][:count] = count.to_i
nodes[node][:accessions] = Array.new
nodes[node][:accessions].push(parsed_ids) unless ids.nil?
end
end
nodes.keys.each do |locus|
STDOUT.puts [locus, nodes[locus][:count], nodes[locus][:accessions].flatten.join(',')].join("\t")
end
Before running this under hadoop it's usefull to just manually pipe these together on the unix command line:
cat snps.txt | ruby snp_mapper.rb | ruby snp_reducer.rb | sort

Because this was the output I expected I could run the pipeline with hadoop:
$HADOOP_HOME/bin/hadoop
jar $HADOOP_HOME/contrib/streaming/hadoop-0.18.3-streaming.jar
-input snps.txt
-mapper snp_mapper.rb
-reducer snp_reducer.rb
-output ./snp_index/
-file snp_mapper.rb
-file snp_reducer.rb


...and lo-and-behold: the snp_index directory contains the result I want.

Parting thoughts
This is the very first time I use mapreduce so the code is far from optimized. Note that the code above is not a reflection of my programming skills :-) I just wanted to focus on the hadoop pipeline rather than the code. Actually, running this thing on my own laptop lasted almost 4 hours and nearly brought it to its knees. It certainly beat it up badly: that hash in the reducer script is just way too big... This little mapreduce experiment is finished, but I might have another go later and optimize the mapper and reducer scripts (getting rid of that huge hash), or create some intermediate reducer step.

Unfortunately it's only using one compute node (my own laptop) so I couldn't check how fast it can be. Running a small subset on Amazon Elastic MapReduce shows that that should work as well.

I only kept track of the count as an aggregate in the nodes. In case I wanted to calculate sum, min and max, I'd probably use a json-formatted string in the key/value pairs:
{"count": 4, "sum": 17, "min": 2, "max": 12}


I will definitely use mapreduce in the future, given the premise that my work would either install it on a cluster locally or we get an institute account on AWS.

Wednesday, 8 July 2009

First test release of circular genome browser



Worked a couple of days on pARP, the circular genome browser, and I think it's ready to be tested out by others. Consider this an alpha release: expect a lot of issues. It's easy to create regions with a negative length, for example. Also, I didn't focus yet on user-friendliness or general input files. Ways of interaction are not made clear to new users yet and the input files still need to have fixed names and be stored in a particular folder.

pARP is designed to be a genome browser for features that are linked to other features on a genome (e.g. readpair mappings). Using a circular display, lines can be drawn connecting these features.

pARP always shows the whole genome. You can zoom into selected regions but the rest is still shown albeit squeezed a bit more together. The reason for this is that I want to show the context at all times. Suppose you'd zoom into two regions A and B that are linked by a large number of readpairs. If the part of the genome that is not A or B is not shown any readpair that has only one of its reads in A or B will just not be shown. By showing the whole genome, even squeezed in a few pixels, you can at least see that some reads are linked outside of A and B.



I've put some information on the github wiki page, such as how to interact and what the datafiles should look like.

For a little taste: here's a very brief screencast:



A lot of things still need to happen:

  • Catch a lot of edge cases
  • Incorporate a library for fast loading of features (i.e. LocusTree, which doesn't exist yet)
  • Make interaction more straightforward: use mouse for panning/zooming for example
  • About 1,472 other things that I currently forget
Also: I'm looking for a new name for pARP. pARP stands for "processing abnormal readpairs" (which what is was meant for originally), but it's actually just a genome browser using a circular representation to show linked features. Suggestions I already got are encircle and SqWheel or Squeal (the last two based on sequence-wheel; Squeal was my own idea, so I like that most at the moment :-) ).

A very, very big thanks goes to Jeremy Ashkenas, the author of ruby-processing. With pARP I have been pushing the boundaries of what that library does, and he has adapted it for my needs as I went. See here for his ruby-processing library. Other thanks go to my colleagues Erin, Klaudia, Jon, Nelo and Chris for their ideas.

pARP can be downloaded or cloned from github. Mac, Windows and linux are available there as well.

Thursday, 16 April 2009

LocusTree - searching genomic loci

"Contigs should not know where they are." That's a phrase uttered by James Bonfield when presenting his work on gap5, the successor to gap4, a much-used assembly software suite. So you think: "Wait a second: you're talking about assembly, and the contigs should not store their position?"

This statement addresses a problem that we encounter often when working with genomic data: how to handle features. The approach often used is to give the feature a 'chromosome', 'start position' and 'stop position'. Seems reasonable, right? So if you want all features on chromosome 1 between positions 6,124,627 and 6,827,197 you just loop over all features and check if their range overlaps with this query range. Indeed: seems reasonable. Unless your collection of features goes into the millions. Suppose you have arrayCGH data with a resolution of 1 datapoint per 50 basepairs (the green/red bars in the picture). If you'd want to search for this region, you can focus on chromosome 1 (if you were so smart to create different collections per chromosome) and start comparing the start and stop position of each feature from the beginning. Chromosome 1 is almost 250Mb so would contain 5 million array probes. That's a lot of features to check.



What James alluded to, was that you should retain the position information at a higher level. He uses an adaption of an R-Tree, which is a datastructure that bins all raw data into bins of a certain size (say 25 elements). Those bins themselves are binned again into larger bins, and so forth until only one bin remains: the root. Each bin knows its start and stop position relative to the bin it's contained in. To search for the same region as above, you start at the root bin which (by definition) overlaps with that query. So you go to the next level and check each of the child bins of root. You can reject those that don't overlap with your range (the red bins) and only focus on the ones that do. Continue until you reach the bottom-most layer which contains your actual features.



This has two advantages for my work on pARP, the visualization tool for aberrant read pair mappings. First of all, it's faster to get those features in a given region. A major feature of the pARP tool is that you can zoom into any region. With more than 6 million values for readdepth, speed is a big issue. But that's not all. Think about this: if I have a screen resolution of 1200 pixels wide, why would I try and plot 6 million points next to each other? Twelve-hundred would be enough, isn't it? R-Tree to the rescue again. In building the tree, each bin stores some aggregate data from its members; for example the average readdepth. To draw the genome readdepth across the whole genome, I only have to start at the root of the LocusTree and go down to that level that contains 1200 bins. No need to load the complete dataset. At the moment LocusTree bins only take the average of whatever value its members have, but this can be expanded to contain any type of aggregation (e.g. number of objects).



LocusTree is not the fastest library around (being written in ruby, and by me), but I think it does what I need it to do. This means that it might not do exactly what you need it to do. But the code is at github, so feel free to fork and improve. I did add a list of to-dos at the bottom of the README file...

UPDATE: I have now realized that - due to DataMapper - this library does not work under jruby, which is necessary for pARP which uses ruby-processing. A better approach than the above would be to have C-based algorithms combined with an API, so that we don't need DataMapper.

Reblog this post [with Zemanta]

Sunday, 8 March 2009

The good and bad of genome viewers

Back before the human genome was fully sequenced and NCBI, UCSC and Ensembl started working on visualization, it made a lot of sense to go for linear representations and use tracks for annotation. After all: chromosomes are linear. Using different tracks to show different types of annotation is the next logical step.

But there is not just one human genome on earth; according to Wikipedia there's about 6.76 billion copies as of March 2009. So instead of talking about "the human genome" in those browsers, we talk about "the reference genome". Each person on earth is different, and so is each human genome. (That putting the reasoning on its head, but never mind).

Differences between humans such as SNPs and microsatellites can still be shown in the track-based browsers.

Things get more difficult when you're looking at structural variation. Structural variation messes up the backbone of the linear genome browser: you can't show differences between individuals in one straight line. Suppose you want to investigate a copy-number variation (CNV) and consult UCSC. You'd find tracks such as this:



Although this does give you quite some information on the CNV in question, it's not an adequate representation of what the different alleles actually look like. It also highlights another issue: the concept of "the reference genome". As more and more genomes are getting sequenced, is the one that was picked first the best for visualization and indeed, the reference? To be able to handle the different MHC haplotypes in Ensembl, for example, the database contains a table called "assembly_exceptions" that contains the alternative assemblies for each haplotype.



I believe that further down the line (although it might be quite a while) we might need to forget the whole notion of a reference genome. Two options come to mind. First of all, we could create an artificial reference that contains all sequence and let each real sequence we want to look at well, reference, that artificial assembly. That would mean that the different MHC haplotypes for example would all be in the same sequence. Similarly, copy-number variants containing let's say 3 to 8 copies would include all 8 in the mock-assembly. Unfortunately this still cannot cover structural variation like inter-chromosomal translocations. We can't build a single artificial assembly that would incorporate those. So here's the alternative: deBruijn graphs. Instead of creating a single linear representation of a reference, just let's not. We could use building blocks to build up each individual. Take a look at this picture:



Suppose that each block is a part of a chromosome and the red and blue lines represent the path to follow to build up the chromosome for a particular individual. In this picture the red individual misses a part of that chromosome that is present in the blue individual, and another part is inverted. Notice that we don't make any (arbitrary) decision on what is the reference sequence. By dragging the blocks we can either place all red connections on one line or all blue ones, making them look like a reference.

If we'd then add annotations to this picture like genes, we'd be able to display fusion genes. Suppose that the densely-striped block is on chromosome 7 in the red individual but on chromosome 12 in the blue one. If there's a gene on the right breakpoints we end up with a fusion gene.


Time permitting I'm going to investigate how useful this will be in projects like CNVs in the 1000genomes project.