rolisz's site

Summing up contacts

Last time we processed the Digsby, Trillian and Pidgin logs and saved them as a unified YAML file for each contact.

Now let's start seeing who I talk with most. A naive way to do this would be to just simply sort by file size the YAML files. A small problem is that some contacts have longer IM names (such as thebest­catal­in), other have shorter names (such as b0gdiy). That's a difference of 8 characters, so 8 bytes, which over say 100.000 lines exchanged become 0.76 Megabytes. A 100.000 line file has about 2 Mb, so it would be an error of 38%.

And there are a few other reasons to iterate over each line separately to measure total talk counts. One is that this will be a general pattern in the future, where we iterate over every line to measure various things. The second is that we can already start verifying some hypotheses: whether girls really do talk more than boys. :D

We have to iterate over every file, open all the YAML documents in there, and then go over each YAML document, counting how many characters does each line have. Just a quick reminder that last time we saved our files as lists containing sender name, date and message.

for file in os.listdir("logs"):
    f = open("logs//"+file,encoding="utf-8")
    size = defaultdict(lambda: 0)
    for message in yaml.load_all(f.read()):
        try:
            date = datetime.datetime.strptime(message[0],"%Y-%m-%d %H:%M:%S")
        except ValueError as e:
            try:
                date = datetime.datetime.strptime(message[0],"%Y-%m-%d %H:%M:%S %p")
                except ValueError as e:
                     print("This one ain't good: "+message[0])
                     continue
        size[date.strftime("%Y-%m-%d")] += len(message[2])
        result.append((file,len(size),sum(size.values())))`

What do we do here? For each YAML document, we try to figure out the date (because we forgot to normalize this in the previous part :-" ). The difference between the two formats is that one is 12 hour and the other is 24 hour. If it doesn't match either, we signal the error and move on.

Then we store the size of the message in a dictionary, using the date as key (we select only the date, because in the YAML we have datetimes). After we have iterated over all the YAML documents in the log with one contact, we put the name of the file (and implicitly, the name of the contact), the size of the dictionary (which is the number of days in which we have talked) and the sum of all the values in the dictionary (how much we actually talked).

And now let's run this program to see whether it works or not. Patience. Patience. Patience. Tea (or coffee). Patience. Oh, after 5 minutes, it's done. That's ridiculous. Lets see if we can speed it up by making it use all our cores.

The naive attempt would be to use mul­ti­thread­ing. Due to the way CPyhton (the official Python im­ple­men­ta­tion) is written, there is a Global In­ter­pre­tor Lock that prevents threads from running si­mul­ta­ne­ous­ly. So no mul­ti­thread­ing. But fret not, for there is mul­ti­pro­cess­ing, another Python module, that enables us to use the full potential of our multi-core processor.

But it requires a few mod­i­fi­ca­tions to our program. Or it won't work. And that's the good case. The bad case being it spawn thounsands of Python processes and crashes your laptop quicker than you can say Mis­sis­sip­pi. Fun.

The biggest mod­i­fi­ca­tion is that we will have to move the logic into a function, change the way it retrieves the name of the file and add some caller code for that function.

Let's start with the boil­er­plate code:

if __name__ == '__main__':
    start_time = time.time()`

    folder = "logs"

    q = Queue()
    rq = Queue()
    for i in os.listdir("logs"):
    q.put(i)
    processes = [Process(target=parseFile,args=(q,rq)) for i in range(7)]

    for p in processes:
    p.start()
    results = []
    while not rq.empty() or not q.empty():
    if not rq.empty():
    results.append(rq.get(block=False))
    else:
    time.sleep(5)
    for p in processes:
    p.join()

    print(time.time() - start_time, "seconds")

Queue is a special type of queue that can handle the access from multiple processes. We make two queues: a consumer queue, where we will put the files that are to be processed, and a producer queue, where our function will put the results. Maybe there is a more efficient or simple way of putting the file names into the queue, other then iterating over them, but I couldn't find it.

Then we create a list of processes. Because I have 8 cores (4x2 with hy­per­thread­ing), I want to make 7 processes so that I can still use my computer while it works. The Process class takes as argument the name of the function and the arguments it should pass to that function. In our case we pass it the two queues.

After we start each process, we start emptying our result queue. We have to do this before our processes finish, because... of something, if the queues are not empty, sometimes they block the processes from ter­mi­nat­ing. So we check every 5 seconds for new things in our queue. After both queues have been emptied, we wait for all processes to finish (even though they all should be done by now). And then we show off our ~~s­cores~~char­ac­ter counts.

We also do a bit of mea­sure­ment to see how long the script takes. Hint: still a lot.

Our function is modified only a little:

def parseFile(queue,result_queue):
    while True:
            try:
                file = queue.get(block=False)

This goes in the begining and to the end goes this:

         result_queue.put((file,days,meCount,otherCount,ratio))
     except Empty:
        break

And with this we get some pretty numbers. I'll leave the part of counting separately the amount you and your contact talks as homework. Or check the GitHub.