liveana0.1.0-SNAPSHOTAn investigation into a day's worth of IceCube Live message traffic dependencies
| (this space intentionally left almost blank) | ||||||||||||
[Clojure bookkeeping ...] ---> | (ns liveana.core (:require [clojure.data.json :as json] [liveana.i3d3 :refer [i3d3 time-to-js]] [clojure.pprint :refer [pprint]] [clj-time.core :refer [date-time interval in-seconds now]] [clj-ssh.cli :refer [ssh sftp]])) | ||||||||||||
First look at a day's worth of messagesI've unpacked one day's worth of SPADE Priority 3 files from the
Data Warehouse (everything from
We want to see the relative abundance of messages by service name. | |||||||||||||
This is a very short Clojure or Python program:
The Clojure code is enclosed in a | (comment (frequencies (for [file (->> "/Users/jacobsen/Desktop/liveana/resources/" clojure.java.io/file file-seq) :when (.isFile file) message (->> file clojure.java.io/reader json/read)] (message "service")))) | ||||||||||||
Our relative abundances are shown in the map (dictionary) on the
left. Execution time is 6.8 seconds in Clojure, 53 seconds in
Python. The 13 messages that have | {nil 13, "I3DAQDispatch" 11283, "pdaq" 29835, "OpticalFollowUp" 39762, "PFRawWriter" 10220, "I3MoniPhysA" 287, "TFRateMonitor" 479, "uptimer" 1696, "sn-email" 12, "I3MoniDomMon" 288, "livecontrol" 2595, "diskmon-expcont" 192, "GammaFollowUp" 39752, "PFFiltWriter" 15200, "temperature" 882, "I3MoniDomTcal" 287, "I3MoniDomSn" 287, "meteorology" 144, "PFServer1" 8508, "PFServer2" 8474, "sndaq" 1037, "PFServer3" 8460, "I3MoniMover" 667, "HSiface" 9713, "PFFiltDispatch" 2165, "PFServer4" 8449, "DB" 624} | ||||||||||||
Refactoring the above code slightly, we pull out a function to extract all messages from the files in our directory. | (def dirname "/Users/jacobsen/Desktop/liveana/resources/") | ||||||||||||
(defn day-msgs [] (for [file (->> dirname clojure.java.io/file file-seq) :when (.isFile file) message (->> file clojure.java.io/reader json/read)] message)) | |||||||||||||
Using => | (comment (count (day-msgs))) | ||||||||||||
What sort of lengths do the messages have? Though the details depend on the transport mechanism, as a rough measure we can take the length of the messages as serialized to JSON strings. | (defn average-message-length [] (->> (day-msgs) (map json/write-str) (map count) (apply +') (#(/ % 201311)) float)) | ||||||||||||
=> | |||||||||||||
To see how the lengths are distributed, we first convert values to bins... | (defn hist-values [xmin xmax nbins xs] (let [binfunc (fn [x] (int (* nbins (/ (- x xmin) (- xmax xmin))))) binned-values (map binfunc xs) freqs (frequencies binned-values)] (map #(get freqs % 0) (range nbins)))) | ||||||||||||
..and use i3d3 to show the distribution of message lengths (as serialized into JSON). | (defn length-distributions [] (let [bins (->> (day-msgs) (map (comp count json/write-str)) (hist-values 0 3000 300))] (i3d3 "plot1" {:data [{:type "bars" :bins bins :color "#3b6c9d" :range [0 3000]}] :size [700, 250] :yscale "log" :xlabel "JSON message length, bytes", :ylabel "Entries"}))) | ||||||||||||
| |||||||||||||
How about priorities? How are they distributed? | (defn prio-distributions [] (let [bins (->> (day-msgs) (map #(% "prio")) (hist-values 0 4 3))] (i3d3 "plot2" {:data [{:type "bars" :bins bins :color "grey" :range [0 4]}] :size [700, 250] :xlabel "Priorities" :ylabel "Entries"}))) | ||||||||||||
| |||||||||||||
How about viewing message lengths broken down by priority? | (defn lengths-by-priority [p div] (let [bins (->> (day-msgs) (filter #(= (% "prio") p)) (map (comp count json/write-str)) (hist-values 0 3000 300))] (i3d3 div {:data [{:type "bars" :bins bins :color "#3b6c9d" :range [0 3000]}] :size [700, 150] :yscale "log" :xlabel "JSON message length, bytes", :ylabel "Entries"}))) | ||||||||||||
| |||||||||||||
| |||||||||||||
| |||||||||||||
This is a work in progress. Next steps are to explore more characteristics of this data, and to consider ways to meet new requirements under consideration for IceCube Live messaging. | |||||||||||||
Next steps -- look at times. | (defn time-for [s] (->> s (re-find #"^(\d{4})-(\d{2})-(\d{2}) (\d{2}):(\d{2}):(\d{2})(?:\.(\d{0,10}))?$") rest (take 6) (map #(Integer/parseInt %)) (apply date-time))) | ||||||||||||
(defn delta-times [] (let [first-time (->> (day-msgs) (map #(% "t")) first time-for) max-t-sec 250 bins (->> (day-msgs) (map #(% "t")) (map time-for) (map #(interval first-time %)) (map in-seconds) (partition 2 1) (map (fn [[a b]] (- b a))) (hist-values 0 max-t-sec 100))] (i3d3 "plot6" {:data [{:type "bars" :bins bins :color "#3b6c9d" :range [0 max-t-sec]}] :size [700, 250] :yscale "log" :xlabel "Times between messages, seconds", :ylabel "Entries"}))) | |||||||||||||
(delta-times) | |||||||||||||
We assume most of the data we're sending is moni messages. Verify this:
=>
| (defn message-type-frequencies [] (->> (day-msgs) (filter #(= (% "prio") 1)) (map #(% "cmd")) frequencies println)) | ||||||||||||
What are the top ten most common varnames monitored at priority 1? To do this we would like a function which would take a set of key/value pairs (a map), sort the values, and find the keys which yield the largest values. | (defn unroll-map-sorted [m] (let [values (-> m vals set sort reverse)] (for [v values] [v (for [k (keys m) :when (= (m k) v)] k)]))) | ||||||||||||
Our most common monitored scalar quantities are therefore: | (defn common-prio-1-monis [] (->> (day-msgs) (filter #(= (% "prio") 1)) (filter #(= (% "cmd") "moni")) (map #((% "payload") "varname")) frequencies unroll-map-sorted (take 10))) | ||||||||||||
=>
| |||||||||||||
| |||||||||||||
Which service is sending the most | |||||||||||||
(defn biggest-state-reporters [] (->> (day-msgs) (filter #(= (% "prio") 1)) (filter #(= (% "cmd") "moni")) (filter #(= ((% "payload") "varname") "state")) (map #(% "service")) frequencies unroll-map-sorted (take 10))) | |||||||||||||
=>
| |||||||||||||
Holy multimessenger, OFU/GFU are totally spamming ITS with their state reporting. How frequently are those being reported? | |||||||||||||
(defn deltats [s] (->> s (partition 2 1) (map (fn [[a b]] (interval a b))))) | |||||||||||||
(let [maxdt 200 bins (->> (day-msgs) (filter #(= (% "prio") 1)) (filter #(= (% "cmd") "moni")) (filter #(= (% "service") "OpticalFollowUp")) (filter #(= ((% "payload") "varname") "state")) (map (comp time-for #(% "t"))) delta-ts (map in-seconds) (hist-values 0 maxdt 1000))] (i3d3 "plot7" {:data [{:type "bars" :bins bins :color "#3b6c9d" :range [0 maxdt]}] :size [700, 250] :yscale "log" :xlabel "Times between OFU status messages, seconds", :ylabel "Entries"})) | |||||||||||||
(delta-times) | |||||||||||||
This bimodal distribution suggests something goofy. We can inspect the times by writing them back out to a file... | (defn write-ofu-state-times [] (->> (day-msgs) (filter #(= (% "prio") 1)) (filter #(= (% "cmd") "moni")) (filter #(= (% "service") "OpticalFollowUp")) (filter #(= ((% "payload") "varname") "state")) (map (comp #(% "t"))) (#(with-out-str (pprint %))) (spit "/tmp/ofutimes"))) | ||||||||||||
Sometimes they look like this, i.e. once every 3 minutes... | "2013-11-20 23:00:57.297997" "2013-11-20 23:03:58.387704" "2013-11-20 23:06:58.671330" "2013-11-20 23:09:58.722348" "2013-11-20 23:13:00.788604" "2013-11-20 23:16:03.245657" "2013-11-20 23:19:04.784654" "2013-11-20 23:22:04.845620" "2013-11-20 23:25:05.534559" "2013-11-20 23:28:07.422556" "2013-11-20 23:31:08.565761" "2013-11-20 23:34:09.676526" | ||||||||||||
... and sometimes like this (~ 1 Hz). | "2013-11-20 17:53:12.213043" "2013-11-20 17:53:12.727890" "2013-11-20 17:53:14.443550" "2013-11-20 17:53:14.957570" "2013-11-20 17:53:16.621265" "2013-11-20 17:53:17.185204" "2013-11-20 17:53:18.849326" "2013-11-20 17:53:19.363238" "2013-11-20 17:53:21.077113" "2013-11-20 17:53:21.590343" "2013-11-20 17:53:23.253939" "2013-11-20 17:53:23.818978" "2013-11-20 17:53:25.482208" "2013-11-20 17:53:25.997032" "2013-11-20 17:53:27.676178" "2013-11-20 17:53:28.241808" | ||||||||||||
Time dependence of time difference: | (defn time-difference-vs-t [] (let [t0 (->> (day-msgs) (first) (#(% "t")) time-for) points (->> (day-msgs) (filter #(= (% "prio") 1)) (filter #(= (% "cmd") "moni")) (filter #(= (% "service") "OpticalFollowUp")) (filter #(= ((% "payload") "varname") "state")) (map #(% "t")) (partition 2 1) (take-nth 10) ;; so the plot doesn't get too huge (map (fn [[s1 s2]] ;; time since start: {:x (in-seconds (interval t0 (time-for s1))) ;; time spread: :y (in-seconds (interval (time-for s1) (time-for s2)))})))] (i3d3 "plot8" {:data [{:type "points" :values points :color "#3b6c9d" :size 1.5}] :size [700, 500] :xlabel "t (seconds)", :ylabel "delta-t"}))) | ||||||||||||