liveana0.1.0-SNAPSHOTAn investigation into a day's worth of IceCube Live message traffic dependencies
| (this space intentionally left almost blank) | ||||||||||||
[Clojure bookkeeping ...] ---> | (ns liveana.core
(:require [clojure.data.json :as json]
[liveana.i3d3 :refer [i3d3 time-to-js]]
[clojure.pprint :refer [pprint]]
[clj-time.core :refer [date-time interval in-seconds now]]
[clj-ssh.cli :refer [ssh sftp]])) | ||||||||||||
First look at a day's worth of messagesI've unpacked one day's worth of SPADE Priority 3 files from the
Data Warehouse (everything from
We want to see the relative abundance of messages by service name. | |||||||||||||
This is a very short Clojure or Python program:
The Clojure code is enclosed in a | (comment
(frequencies (for [file (->> "/Users/jacobsen/Desktop/liveana/resources/"
clojure.java.io/file
file-seq)
:when (.isFile file)
message (->> file
clojure.java.io/reader
json/read)]
(message "service")))) | ||||||||||||
Our relative abundances are shown in the map (dictionary) on the
left. Execution time is 6.8 seconds in Clojure, 53 seconds in
Python. The 13 messages that have | {nil 13,
"I3DAQDispatch" 11283,
"pdaq" 29835,
"OpticalFollowUp" 39762,
"PFRawWriter" 10220,
"I3MoniPhysA" 287,
"TFRateMonitor" 479,
"uptimer" 1696,
"sn-email" 12,
"I3MoniDomMon" 288,
"livecontrol" 2595,
"diskmon-expcont" 192,
"GammaFollowUp" 39752,
"PFFiltWriter" 15200,
"temperature" 882,
"I3MoniDomTcal" 287,
"I3MoniDomSn" 287,
"meteorology" 144,
"PFServer1" 8508,
"PFServer2" 8474,
"sndaq" 1037,
"PFServer3" 8460,
"I3MoniMover" 667,
"HSiface" 9713,
"PFFiltDispatch" 2165,
"PFServer4" 8449,
"DB" 624} | ||||||||||||
Refactoring the above code slightly, we pull out a function to extract all messages from the files in our directory. | (def dirname "/Users/jacobsen/Desktop/liveana/resources/") | ||||||||||||
(defn day-msgs []
(for [file (->> dirname
clojure.java.io/file
file-seq)
:when (.isFile file)
message (->> file
clojure.java.io/reader
json/read)]
message)) | |||||||||||||
Using => | (comment (count (day-msgs))) | ||||||||||||
What sort of lengths do the messages have? Though the details depend on the transport mechanism, as a rough measure we can take the length of the messages as serialized to JSON strings. | (defn average-message-length []
(->> (day-msgs)
(map json/write-str)
(map count)
(apply +')
(#(/ % 201311))
float)) | ||||||||||||
=> | |||||||||||||
To see how the lengths are distributed, we first convert values to bins... | (defn hist-values [xmin xmax nbins xs]
(let [binfunc (fn [x]
(int (* nbins (/ (- x xmin)
(- xmax xmin)))))
binned-values (map binfunc xs)
freqs (frequencies binned-values)]
(map #(get freqs % 0) (range nbins)))) | ||||||||||||
..and use i3d3 to show the distribution of message lengths (as serialized into JSON). | (defn length-distributions []
(let [bins (->> (day-msgs)
(map (comp count json/write-str))
(hist-values 0 3000 300))]
(i3d3 "plot1" {:data [{:type "bars"
:bins bins
:color "#3b6c9d"
:range [0 3000]}]
:size [700, 250]
:yscale "log"
:xlabel "JSON message length, bytes",
:ylabel "Entries"}))) | ||||||||||||
| |||||||||||||
How about priorities? How are they distributed? | (defn prio-distributions []
(let [bins (->> (day-msgs)
(map #(% "prio"))
(hist-values 0 4 3))]
(i3d3 "plot2" {:data [{:type "bars"
:bins bins
:color "grey"
:range [0 4]}]
:size [700, 250]
:xlabel "Priorities"
:ylabel "Entries"}))) | ||||||||||||
| |||||||||||||
How about viewing message lengths broken down by priority? | (defn lengths-by-priority [p div]
(let [bins (->> (day-msgs)
(filter #(= (% "prio") p))
(map (comp count json/write-str))
(hist-values 0 3000 300))]
(i3d3 div {:data [{:type "bars"
:bins bins
:color "#3b6c9d"
:range [0 3000]}]
:size [700, 150]
:yscale "log"
:xlabel "JSON message length, bytes",
:ylabel "Entries"}))) | ||||||||||||
| |||||||||||||
| |||||||||||||
| |||||||||||||
This is a work in progress. Next steps are to explore more characteristics of this data, and to consider ways to meet new requirements under consideration for IceCube Live messaging. | |||||||||||||
Next steps -- look at times. | (defn time-for [s]
(->> s
(re-find
#"^(\d{4})-(\d{2})-(\d{2}) (\d{2}):(\d{2}):(\d{2})(?:\.(\d{0,10}))?$")
rest
(take 6)
(map #(Integer/parseInt %))
(apply date-time))) | ||||||||||||
(defn delta-times []
(let [first-time (->> (day-msgs)
(map #(% "t"))
first
time-for)
max-t-sec 250
bins (->> (day-msgs)
(map #(% "t"))
(map time-for)
(map #(interval first-time %))
(map in-seconds)
(partition 2 1)
(map (fn [[a b]] (- b a)))
(hist-values 0 max-t-sec 100))]
(i3d3 "plot6" {:data [{:type "bars"
:bins bins
:color "#3b6c9d"
:range [0 max-t-sec]}]
:size [700, 250]
:yscale "log"
:xlabel "Times between messages, seconds",
:ylabel "Entries"}))) | |||||||||||||
(delta-times) | |||||||||||||
We assume most of the data we're sending is moni messages. Verify this:
=>
| (defn message-type-frequencies []
(->> (day-msgs)
(filter #(= (% "prio") 1))
(map #(% "cmd"))
frequencies
println)) | ||||||||||||
What are the top ten most common varnames monitored at priority 1? To do this we would like a function which would take a set of key/value pairs (a map), sort the values, and find the keys which yield the largest values. | (defn unroll-map-sorted [m]
(let [values (-> m vals set sort reverse)]
(for [v values]
[v (for [k (keys m) :when (= (m k) v)] k)]))) | ||||||||||||
Our most common monitored scalar quantities are therefore: | (defn common-prio-1-monis []
(->> (day-msgs)
(filter #(= (% "prio") 1))
(filter #(= (% "cmd") "moni"))
(map #((% "payload") "varname"))
frequencies
unroll-map-sorted
(take 10))) | ||||||||||||
=>
| |||||||||||||
| |||||||||||||
Which service is sending the most | |||||||||||||
(defn biggest-state-reporters []
(->> (day-msgs)
(filter #(= (% "prio") 1))
(filter #(= (% "cmd") "moni"))
(filter #(= ((% "payload") "varname") "state"))
(map #(% "service"))
frequencies
unroll-map-sorted
(take 10))) | |||||||||||||
=>
| |||||||||||||
Holy multimessenger, OFU/GFU are totally spamming ITS with their state reporting. How frequently are those being reported? | |||||||||||||
(defn deltats [s]
(->> s
(partition 2 1)
(map (fn [[a b]] (interval a b))))) | |||||||||||||
(let [maxdt 200
bins (->> (day-msgs)
(filter #(= (% "prio") 1))
(filter #(= (% "cmd") "moni"))
(filter #(= (% "service") "OpticalFollowUp"))
(filter #(= ((% "payload") "varname") "state"))
(map (comp time-for #(% "t")))
delta-ts
(map in-seconds)
(hist-values 0 maxdt 1000))]
(i3d3 "plot7" {:data [{:type "bars"
:bins bins
:color "#3b6c9d"
:range [0 maxdt]}]
:size [700, 250]
:yscale "log"
:xlabel "Times between OFU status messages, seconds",
:ylabel "Entries"})) | |||||||||||||
(delta-times) | |||||||||||||
This bimodal distribution suggests something goofy. We can inspect the times by writing them back out to a file... | (defn write-ofu-state-times []
(->> (day-msgs)
(filter #(= (% "prio") 1))
(filter #(= (% "cmd") "moni"))
(filter #(= (% "service") "OpticalFollowUp"))
(filter #(= ((% "payload") "varname") "state"))
(map (comp #(% "t")))
(#(with-out-str (pprint %)))
(spit "/tmp/ofutimes"))) | ||||||||||||
Sometimes they look like this, i.e. once every 3 minutes... | "2013-11-20 23:00:57.297997" "2013-11-20 23:03:58.387704" "2013-11-20 23:06:58.671330" "2013-11-20 23:09:58.722348" "2013-11-20 23:13:00.788604" "2013-11-20 23:16:03.245657" "2013-11-20 23:19:04.784654" "2013-11-20 23:22:04.845620" "2013-11-20 23:25:05.534559" "2013-11-20 23:28:07.422556" "2013-11-20 23:31:08.565761" "2013-11-20 23:34:09.676526" | ||||||||||||
... and sometimes like this (~ 1 Hz). | "2013-11-20 17:53:12.213043" "2013-11-20 17:53:12.727890" "2013-11-20 17:53:14.443550" "2013-11-20 17:53:14.957570" "2013-11-20 17:53:16.621265" "2013-11-20 17:53:17.185204" "2013-11-20 17:53:18.849326" "2013-11-20 17:53:19.363238" "2013-11-20 17:53:21.077113" "2013-11-20 17:53:21.590343" "2013-11-20 17:53:23.253939" "2013-11-20 17:53:23.818978" "2013-11-20 17:53:25.482208" "2013-11-20 17:53:25.997032" "2013-11-20 17:53:27.676178" "2013-11-20 17:53:28.241808" | ||||||||||||
Time dependence of time difference: | (defn time-difference-vs-t []
(let [t0 (->> (day-msgs)
(first)
(#(% "t"))
time-for)
points (->> (day-msgs)
(filter #(= (% "prio") 1))
(filter #(= (% "cmd") "moni"))
(filter #(= (% "service") "OpticalFollowUp"))
(filter #(= ((% "payload") "varname") "state"))
(map #(% "t"))
(partition 2 1)
(take-nth 10) ;; so the plot doesn't get too huge
(map (fn [[s1 s2]]
;; time since start:
{:x (in-seconds (interval t0 (time-for s1)))
;; time spread:
:y (in-seconds (interval (time-for s1)
(time-for s2)))})))]
(i3d3 "plot8" {:data [{:type "points"
:values points
:color "#3b6c9d"
:size 1.5}]
:size [700, 500]
:xlabel "t (seconds)",
:ylabel "delta-t"}))) | ||||||||||||