Saturday, February 20, 2016

Distributed Systems and Orchestral Music

Writing for distributed systems and composing orchestral music

... too long for a tweet, too short for a real blog ...

Composing music for a single instrument, a la the Bach violin partitas or the Beethoven piano sonatas or the Berio sequenzas, requires expert knowledge in the targeted instrument. The composer must understand the dynamic range, the various timbres, the harmonic resonance, and many other facets of the instrument to create a truly compelling piece of music.

Similarly writing expert-level software for a single machine requires one to understand things like disk IO, CPU cache registers, kernel tuning, and the various ways in which those systems building blocks can be efficiently employed together (composed!) to create a useful piece of software.

Moving out to the next level of abstraction, whilst composing music for an orchestra, the composer not only needs to understand each individual instrument, but also how to assemble them together to contribute to the braoder musical goal. For example, listen to the Mahler symphonies or the Stravinski ballets (The Rite of Spring, and so on). In designing a distributed system, similarly, the engineer needs to understand how machines behave at the local, individual local, but also how the sum of them contribute to the overall distributed system algorithm. Faulty disks, long virtual machine pauses, or other failures at the individual machine level, if not handled properly, can cause the distributed algorithm to fail or misbehave in unexpected. . . .

To expand the subject space further, one could conceivably compare multi-datacenter distributed systems to something like Stockhausen's Gruppen or his Helikopter-Streichquartett (https://en.wikipedia.org/wiki/Helikopter-Streichquartett). Although, at this point the anaolgies start to break down, and there aren't enough data-centers-in-deployed-helicopters to have a fair comparison. Music rules.

Tuesday, May 12, 2015

More thoughts on cassandra's gossip

Over the last few months I've been working out new ways to think and talk about cassandra's existing gossip system. This is an effort to help the cassandra community become more comfortable with this part of the database.

 

gossip is a verb


One of the more confusing aspects of cassandra's gossip system is the fact that two essential ideas are jammed together: cluster membership, and how that data is disseminated throughout the cluster. As an outsider looking in, it's very easy to conflate the two: when we look at nodetool, there is a command for "gossipinfo", prints membership information. When members of the community (myself included, until very recently) speak of gossip, we speak of "what's in gossip" or "can you show me the gossip output" when what we really interested in is "what is the state of the nodes in the cluster".

What I want to do is to separate these two concepts, membership and gossip. Gossip is a verb; it is a mechanism to disseminate information. Membership, on the other hand, is a noun; it is the metadata about nodes in the cluster. We can apply the verb to the noun (disseminate the node metadata), but we should recognize that the two are distinct.

 

gossip as repair (in the miniature)


Cassandra's standard mechanism for spreading user data across nodes is somewhat like this (highly simplified)

  • write path
    • coordinator gets request from client
    • determines nodes that own range
    • sends the writes to those nodes
  •  repair (anti-entropy)
    • A node figures out all the other nodes that share the same ranges
    • they build up merkle trees for each range
    • send trees back to initiator
    • diff the merkle trees
    • exchange any divergent data

(Note: I've left out hints and read-repair for clarity)

Cassandra's gossip system is much like repair (simplified, as well)
  • a node selects another peer (randomly)
  • the initiator sends a digest of it's membership data
  • the peer sends any differences, and a digest of data it needs
  • initiator merges data from peer, and sends back and data the peer needs
  • peer merges data from initiator

Both repair and gossip perform anti-entropy, a periodic data reconciliation operation. They both send summaries (merkle trees or digests) of their data, and then send the full data if any divergence is detected. Thus, in a certain light, cassandra's gossip can be viewed as another form of repair; one that operates on cluster membership data instead of user data. The primary difference, then, between the dissemination of the two types of data is that user data has an initial broadcast or multicast operation when new or updated data is received; membership data is only disseminated via anti-entropy (the current gossip implementation).

Taking a step back, we can see cassandra's gossip as a full distributed system on it's own right, living within the context of another distributed system.

Thursday, July 31, 2014

Gossip papers

In preparation for my talk at the Cassandra Summit 2014 ("Demystifying Gossip"), I've been going back and reading a lot of existing literature on gossip and epidemic broadcast protocols. What follows is simply a listing of all the papers being carried around in my bag at this time, plus a quick note or two that might be helpful to others. This is not a thoroughly exhaustive listing by any measure, but it mainly grew organically as I started off with one paper, then checked it's references, read those, checked their references, ...

These papers aren't listed in any particular order, and any grouping is largely as I understand the problem at this time. There are other papers I'm sure I'm missing, and I haven't made through all of these yet (and a few I need to reread, a few more times!). Also, the notes are are probably somewhat subjective, but, meh, welcome to the internet.

Epidemic Algorithms for Replicated Database Maintenance - one of the first seminal papers describing gossip/epidemic protocols (1989). great read

The Peer Sampling Service: Experimental Evaluation of Unstructured Gossip-Based Implementations - First major paper to spell out the peer service in relation to gossip systems

Bimodal Multicast

HyParView - Hybrid Partial View, example of a peer sampling service (cares about membership & view maintenance only). maintains two partial views: active and passive; when peer from active is unreachable, a peer from passive view is promoted. special join logic when new peer tries to join.

Epidemic Broadcast Trees - Plumtree paper. Gossip overlay that uses a spanning tree. implemented in riak 2.0.

Thicket: A Protocol for Building and Maintaining Multiple Trees in a P2P Overlay An extension of Plumtree, and specifically optimizes for sender-based trees by have multiple trees over the cluster, and attempting to ensure a given peer is an interior node in only one of the trees.

Gossip-Based Broadcast - Nice overview of gossip systems written by HyParView/Plumtree authors

Correctness of a Gossip Based Membership Protocol - partial view alg, with a formal proof

SWIM: Scalable Weakly-consistent Infection-style Process Group Membership Protocol

Efficient Reconciliation and Flow Control for Anti-Entropy Protocols - adding anti-entropy to gossip exchanges. basis of cassandra's gossip subsystem

GEMS: Gossip-Enabled Monitoring Service for Heterogeneous Distributed Systems - attempts to introduce a consensus algorithm over gossip for determining some global notion of a peer's liveness; used as part of a monitoring service. each node emits a continuous heartbeat for failure detection on peers, but peers judge a target node by how rounds of gossip it has been since it saw an update from that target, which is a (semi-)subjective measurement.

Large-Scale Newscast Computing on the Internet - describes the newscast protocol.

A Robust and Scalable Peer-to-Peer Gossiping Protocol - Followup to the Newscast paper, primarily deals with the implementation and testing the Newscast authors performed. not much info about the actual protocol, and no source code, either.

Spatial Gossip and Resource Location Protocols

Failure Detectors (not specifically gossip, pre se, but close enough for my needs)

A Gossip-Style Failure Detection Service - not dissimilar to cassandra's basic notion of failure detection. based on random gossip and a heartbeat counter

The ϕ Accrual Failure Detector - output a suspicion level of a node based on received heartbeat rate. a modified version is used in cassandra.

The Failure Detector Abstraction

Implementations

The two implementations I know best are those found in cassandra and riak. Serf is an implementation of the SWIM paper in golang, but have never had a chance to check it out.


Wednesday, February 22, 2012

Cassandra Java client libraries

I'm keeping a running list of Cassandra Java client libraries, so it's not always going to be complete. If you have any additions, please ping me!
  • Hector
    • actively developed in the open source community
  • Astyanax
    • developed by Netflix
    • open sourced Feb 2012
  • Cassie 
    • written in Scala
    • developed by Twitter
    • open sourced Feb 2012
  • Cassandra-Hector-Wrapper 
    • A wrapper around Hector to simplify the access / object model
    • developed by zcourts
    • open sourced Feb 2012
  • Skeletor
    • written in Scala
    • A wrapper around Hector

Tuesday, October 19, 2010

Adventures with Squid caching - Vary header

Note: this entry is part of a series on RESTful web services, HTTP, and squid caching, all of which starts with an overview.

Background
The HTTP 1.1 RFC includes a lot of new features to support caching. One of the headers we found interesting was the Vary header. It allows an origin server to tell the caching proxy that the URI will have variants (different encodings, formats, etc) as different representations of the resource. Basically, it allows your service to say, "Hey, for a URI, I support JSON, XML, and PDF, or I can give that to you compressed or not". Of course the formats can be whatever you choose, and further you can vary on whatever else you want. The key is that the Vary header acts as kind of a pointer and says, "I'm listing other headers that are the variants, and they will enumerate the specifics of what's different".

Sounds interesting, but it's damn hard to visualize, so let's make an example. Let's say our service has the following resource URI for customer orders:

/customers/{cust_id}/orders

And that the service can return the order data in different formats (mime-type); for argument's sake we'll support JSON, XML and PDF. Thus, a typical HTTP invocation would look like this (using an abbreviated HTTP message format here):

c>
  GET /customers/123/orders HTTP 1.1
  Accept: application/json

s> 
  200 OK
  Vary: Accept
  Accept: application/json,application/xml, application/pdf
  Cache-Control: max-age=some_value_in_seconds

  { ... json data ...}


Then if another client came along and requested xml data for the same URI, the caching proxy would see that, while it has json in cache (assuming it's not stale), it does not have xml cached, and would forward the request to the origin server. The cache proxy knows to do this because of the Vary header that was returned form the first call which informed it to allow variants on the incoming request Accept header. Those variants were declared in the HTTP response Accept header to the first request. TO illustrate:


c>
  GET /customers/123/orders HTTP 1.1
  Accept: application/xml

s>
  200 OK
  Vary: Accept
  Accept: application/json,application/xml, application/pdf
  Cache-Control=some_value_in_seconds

<xml data ...... />

OK, so now you have two variants of your resource in the caching proxy (squid), but, interestingly, they will expire at different wall clock times. It is important to know that the two will not "invalidate together" when one variant's TTL expires; they essentially live independent lives. Let's take an example (while it's possible for the origin server to return different max-age values for the Cache-Control header, for sanity's sake I'll make it 1 hour across the board.):
  1. request 1 asks for json data at 10:00am; it will be live in cache until 11:00am
  2. request 2 asks for xml data at 10:59am; it will live in cache until 11:59am
If you are worried about freshness of data (at whatever time scale), this is something to be aware of in squid. (Note: I'll be covering resource invalidation in a future installment of this series.)

Lessons Learned
We initially introduced with Vary because it allowed us to introduce new, more compact data representations without disturbing existing clients (that is, forcing them to now parse a new format), but the cache miss rate due to different clients requesting different formats coupled with the cross-country latency became a problem. However, if the latency to generate each variant is nontrivial either because of expensive database reads or, in our case, cross-country/cross-data center calls, each URI variant that may be used is going to amount to callers eating the pricey cost of the cache miss for a specific variant, even if the other variants are fresh in cache. Further, it becomes a burden on the service's engineers to figure out why some teams see "fast responses" (cache hits) and other teams see "your slow-ass service" (cache misses) as service owners need to keep the matrix of "what team consumes which variant" to figure out the rough math of how likely are they to get a cache hit.

If the latency per variant is inexpensive and the load on back-end resources is bearable, using Vary in your HTTP caching can open up some interesting data format possibilities and ways in which you can service clients.








Monday, October 18, 2010

Adventures with Squid caching - overview

I've been knees-deep in the REST services world for most of the last 12 months, and while that domain may be simpler to model and deal with than RPC or SOAP, one of the most compelling advantages is making use of HTTP caching. Instead baking your own caching layer or application, using stateful server sessions, or, worst of all, forcing clients to use a fat-client jar that does it's own caching (yes, I've committed that sin in the distant past), there are several open-source HTTP cache proxy applications that support the HTTP caching spec to differing degrees. The main cache proxies are:
For our needs, we just went with squid as a team member had lots of experience and successful with it at a previous gig.

Of course, we could have used a look-aside cache (something like memcached), but we were interested in having a write-through cache in front of the service to reduce hops and latency (I'll explain below).


Basic premise and early decisions
We were breaking up a massive www application into smaller standalone components/services for, ya know, the usual sorts of reasons: scalability, availability, no single points of failure, and so on. In addition to pulling apart legacy code and rethinking our data persistence options, we also opened up another data center - on the other side of the country. Our data itself wasn't going to be directly replicated to the new data center (not going into any details on this) as there many hands in the pie that could not be easily pulled out, slapped, or cut off. To that end, we had to deal with applications deployed only in the new data center that needed the existing customer data. Enter cross-country latency - umm, yummy.
So now that you know some of the ground rules and constraints of the work, we decided early on to use REST as the premise/framework for building out our services. We wanted to stick as close to the HTTP RFC as possible, within reason, without just inventing a bunch weird-ass rules around using our stuff. And, true to the title of this series, we wanted to use a reverse cache proxy to not only improve the basic performance of the service, but also to help relieve the cross-colo latency.

So, why is this interesting to you? I'm going to record my experiences with REST, squid, and the rest, and I'll point out what worked well, what didn't, and what was freaking painful to figure out, get working, or just plain abandon. I'll cover the following topics:
  • Vary (HTTP header)
  • ETag (HTTP header)
  • cache invalidation via PUT/POST/DELETE
  • tying them all together

Of course, as we went through building out everything in the service, it didn't all fall into neat, even bucket of functionality like the bullet points above. Like everything else when building out new buckets of functionality, it's a little of coding here, a little bit of configuration there, and some ITOps work to bundle it all up.

Oh, yeah, beer was involved, as well. Lots.

Wednesday, September 8, 2010

Emacs and sql-plus fun

Found this nugget to make sql-plus less painful:

On emacs, M-x sql-oracle does the trick

Tuesday, April 27, 2010

REST != !SOAP

OK, so we're heavy in recruitment time here at $DAYJOB, and I see lots of resumes that claim deep REST experience. However, when I probe candidates about building URIs, resources, using HTTP correctly, and so on, they are largely clueless. Probe a bit further and really all they've done is create some simple HTTP endpoint that takes query params or XML in the request body.

Now, there's nothing wrong at all with simple endpoints, andd they're damn useful. However, trying to pass it off as REST is another thing. Those folks just seem to think at "If it's not SOAP, I must be doing REST!".

Wrong. So very wrong. Reminds me of this blog I read recently, and it's associated reddit discussion.


Thursday, April 1, 2010

SQLPlus options

So, for the truly geeky, you can "pass" in parameters to alter the default environment/behavior of sqlplus. The orafaq page helped me figure this out:
  • create a login.sql file for all your commands
  • add the directory to the $SQLPATH env variable (on most client machines, you'll need to create the env var)
  • launch sqlplus, and you should have all your options setup
My current login.sql looks like this:

define _editor=emacs
set timing on
set serveroutput on
set colsep '|'
set linesize 120

Like I said, for the geeky, but damn handy when you don't want to muck with and IDE like TOAD.

Tuesday, January 5, 2010

IBM J9 Diagnostics

Here at $DAYJOB we're using the IBM J9 JVM for our test and production deployments. I've had some performance and memory problems recently, so here's a quick entry about some of the things the J9 JVM provides for diagnotics.

First up: the diagnostics documentation summary.

Next, here's a series of things you can get the J9 JVM to give you:
  • GC log - use the standard -verbose:gc command line option to get an XML file dump of garbage collection activity. You can set the output directory,
  • Heapdump log - generated when you call it explicitly in your app, or more likely, automatically when you have an OutOfMemory condition. *.phd files are created which are not human-readable
  • Javadump log (a/k/a javacore or thread dump) - contains human-readable data on the the operating system, the application environment, threads, stacks, locks, and memory. Creates files starting with javacore...text
  • System dump (a/k/a core dump) - typically only produced when the jvm fails unexpectedly. Log contains active processes, threads, and system memory. Creates files named Snap*.trc

Wednesday, August 19, 2009

Regex for parsing HTTP query string

Instead of using lame-ass StringTokenizer looking for an ampersand, here's a regex I found handy:

((\w*)+)(=)[\w\d]*[^(\&)]

The extra parens around the first part token allow you treat the parameter name as a group by iteslf. Could've added it it to the value, as well, I suppose .... but clearly I don't need that now!

Monday, March 2, 2009

Erlang records

Erlang progress update: I've been sidetracked by working on some Java reflection for $DAYJOB (not bad/unfreindly stuff, just needed to work through it), so I'm now back on my side project. Working through Joe Armstrong's book, I'm now looking at Erlang records (pg 69+). Records seem like a cool to add names to tuples so you end up with something that feels like a name-value pair map or, if you take the analogu further, almost like an object. Granted the tuple is just a linked list, and I'm not sure of the implementation of a record, if it keeps an index of pointers to each named tuple 'field', or anything like that (yeah, I coulda google'd - its still early though), but still, a handy feature for OO folks.

Thursday, February 26, 2009

Java Annotations and Proxies

Just discovered this: If you create a class with annotations of any kind, those annotations (and any other metadata) is lost when the object is proxied. Google searching didn't turn too much on the subject, but I did find this post about one work around (but it only relates to Hibernate). This seems sort-of innocuous on the surface (from a language/API level), but with every framework under the sun using reflection/proxies to add real value, this really does suck.

Any suggestions would be greatly appreciated. So would free cocktails....

UPDATE: OK- here's what I've now uncovered. In my original case, I was getting a bean via Spring and was trying to get the annotations by simply reflecting on various methods - no dice. In my real world case, however, I am looking at the target object via an aspect (meaning, I am inspecting the invoked object while in the aspect code). From the ProceedingJoinPoint object that is a parameter into my aspect method, I can see all the annotations.

I think I still need a cocktail...

Tuesday, February 17, 2009

Erlang "processes" clarified

Now that I'm into erlang, I've been going through the Getting Started guide on the erlang.org site, and I found this little nugget:
the term "process" is usually used when the threads of execution share no data with each other and the term "thread" when they share data in some way. Threads of execution in Erlang share no data, that's why we call them processes.
I was always confused about erlang "processes" - if they were full-blown executable VM and OS-level processes or what have you. It's getting clearer now...

Thursday, February 12, 2009

New Project - Erlang E-commerce

Well, now that we've moved to California (Sept 2008), and things are finally settling in, I'm looking for a new project to play with. At the end of long conversation with a current coworker, Steve Atkinson, about concurrency and the bear that it can be to wrestle with (especially in regards to shared state) I admitted that I'm interested in seeing how the new crop of functional programming languages deal with these types of issues.

So, to jump on a bandwagon (that's probably already left the station), I've decided to started exploring Erlang. What I want to do is create a small e-commerce style of application to see how the non-shared state model deals with the state of objects/entities like "customer", "order", and "product" over the lifetime of the application. I suspect my point of departure will look like a typical Java web-app (it's how I pay the rent), but am hoping to divert into something more or a true erlang app (whatever that may be). Further, since I have absolutely no background in functional languages, this should be a good learning and growing experience.

I'll try to jounal as much of the interesting details, thoughts, and ideas as I go along. However, we're expecting our second baby in about six weeks (holy cow!), so I'm not sure how much progress I'll make in the spring or summer. Either way, I'm hoping to gain another perspective on this whole programming thing...

Tuesday, December 9, 2008

Creating a Java annotation

So I created a new (method-level) annotation that basically just takes a string, and when the target method is invoked, via aspect I get that string. The annotation looks like this (name changed to protect the innocent):

public @interface Annotatable { String value(); }


After a few hours of banging my head, trying to figure out why everything compiled correctly, yet when testing I could never get the find the annotation on the target method. Turns out, this was soooo poorly documented by Sun/Java gods, you need to annotate your annotation, like this:

import java.lang.annotation.*;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Inherited


The RetentionPolicy enum tells the compiler to keep the annotation around AFTER you compile, instead of throwing it away (like it does by default). The Target annotation tells the compiler which types of elements your annotation can apply to (classes, methods, params, all, etc.). The long and short of it is: don't forget to annotate your annotations.

Wednesday, August 6, 2008

NUMA and the JVM

Good blog entry from Jon Masamitsu about NUMA optimizations in Java 6 on Solaris. NUMA, essentially, and vastly simplified, is to access a region of memory that is physically closer to a processer on a multi-processor system. This way there's less latency when reading/writing to the general memory region. For Java, the optimizations are made primarily to the Eden (young generation) heap space, as well as assigning a thread to a particular CPU.

To enable the feature. it's a command-line param to the JVM at startup: -XX:+UseNUMA .

Java 6 threading article

An article on Java 6 threading optimizations recently appeared on infoq. It's a good article in two parts, but here I'm just going to capture some of the interesting notes about the different locking features now in the JVM (most of this entry is paraphrase - that is, notes to myself).

Escape analysis - determine the scope of all references in an app. If HotSpot can determine the refs are limited to local scope and none can esacpe, it can have the JIT apply runtime optimizations.

Lock elision - when refs to a lock are limited to local scope (for example creating an modifying a StringBuffer), no other thread will ever have access to object; hence it is never contended for. Then, you really don't need the lock anyway and can be elided/omitted.

Biased Locking - most locks are never accessed by more than one thread, and even when multiple threads do share data, access is rarely contended. Long story short, this makes subsequent lock acquisitions less expensive by holding onto lock until somebody else wants it. Java 6 does this by default now.

Lock Coarsening (or merging) - occurs when adjacent synchronized blocks may be merged into one (if same lock is used for all methods). For example. when calling a series StringBuffer append() operations. Locks are not coarsened inside of a loop because the lock will be held for (potentially) too long.

Thread suspending versus spinning - When a thread waits for a lock, it is usually suspended by the OS. This involved taking it off the stack, rescheduling, etc. However, most locks are held for very brief time periods (based on profiling), so if the second just waits a little bit without being suspended, it can probably acquire the lock it wants. To wait it just goes into a busy loop - known as spin locking. Was introduced in Java 1.4.2 with a default (fixed) spin of 10 iterations before suspending the thread.

Adaptive Spinning - Spin duration not fixed anymore, but policy based on previous spin attempts on same lock and state of lock owner. If spinning likely to succeed, will go for a longer iterations count (say, 100); else, will bail on spinning altogether and suspend.
Introduced in Java 6.

Wednesday, July 2, 2008

devWorks Benchmarking article

This is a great article from the IBM devWorks site about Java performance benchmarking. I'm just capturing some notes in this entry.

Measuring time:
  1. System.currentTimeMillis() - gets the "wall clock" time, but the updates from the OS are hardware dependent and may only occur every ~10 ms. call to OS returns instantly
  2. System.nanoTime() - returns a differential time, measured in microseconds, bu the call to the OS itself can take microseconds.
  3. ThreadMXBean - JMX extension that offers to read a Thread's CPU usage (may be misleading due to I/O and it's uage may be expensive)
Code warmup:
  • Class loading can be observed via ClassLoadingMXBean
  • Most VMs run the code for while in interpreted mode to gether stats before performing JIT compilation. Sun's Hot Spot defaults: 1500 time for client VMs, 10,000 for server. Could use CompilationMXBean to measure JIT time, but impl is hosed. Alternative is to watch stdout with -XX:+PrintCompilation JVM option

Friday, May 23, 2008

Grails + email service

Alright, finally getting back into to grails coding after a long time away. For my test pet store application, I decided to take a day and create newsletter sender. Basically, it just takes an email address (submitted via a little form widget thingy), and stores it in a separate table the database. I'm not bothering with user accounts yet for the site, so I'm just keeping it in a stupidly simple table with just a database id and the address itself. There's also a controller function for removing an address from the list (handy I should think).

Now that I've got email addresses to send to, I need to actually send the newsletters. Before tackling that, though, I want to send out the newsletters on a periodic basis, so i need a scheduling/cron like component. I decided to use the Quartz plugin for Grails. We use it at my $DAYJOB, and it's been an excellent workhorse there. After installing the plugin (grails install-plugin quartz), and creating a job (app-home/grails-app/job/SendNewletterJob.groovy), I was 80% done. I just had to define my scheduling and implement the execute() method, which calls my message send service. Easy! Here's my source:

class SendNewsletterJob {
def timeout = 9000l //runs every nine seconds (only for testing!)
def emailService

def execute() {
emailService.sendNewsletter()
}
}


Now for actually pushing the newsletter to the users. I created an app-home/grails-app/services/EmailService.groovy. In Grails parlance, from what I understand, a "service" is not a web-service (REST or SOAP), per se, but more like the Eric Evans DomainDrivenDesign notion of a service. Currently (May 2008) there is no nice plugin for sending email, but there is a nice document on the Grails site that describes how to create a mechinism that wraps Spring mail. I just ripped off the example EmailService, dropped my smtp host values into my grails-app/conf/spring/resources.xml, and I'm business.

Altogether, this project took less than four hours, most of which was fishing around playing with config settings and such. I'm trying to think what the parallel time investment would have been for a straight-up Java implementation. Hmmmmm.........