twisted adbapi: nomes de colunas em query

Quem já usou o twisted.enterprise.adbapi deve ter notado a falta de uma funcionalidade muitas vezes necessária na execução de queries (SELECT) no banco, seja ele qual for: colocar o nome das colunas no resultado, de forma que cada linha seja um dicionário e não um simples set.

Considerando que o adbapi é apenas um wrapper do Python Database API v2.0 (PEP-249), obviamente existem motivos pra essa funcionalidade não estar lá (além da preguiça de alguns). No documento, o primeiro item do FAQ:

Question: 

       How can I construct a dictionary out of the tuples returned by
       .fetch*():

    Answer:

       There are several existing tools available which provide
       helpers for this task. Most of them use the approach of using
       the column names defined in the cursor attribute .description
       as basis for the keys in the row dictionary.

       Note that the reason for not extending the DB API specification
       to also support dictionary return values for the .fetch*()
       methods is that this approach has several drawbacks:

       * Some databases don't support case-sensitive column names or
         auto-convert them to all lowercase or all uppercase
         characters.

       * Columns in the result set which are generated by the query
         (e.g.  using SQL functions) don't map to table column names
         and databases usually generate names for these columns in a
         very database specific way.

       As a result, accessing the columns through dictionary keys
       varies between databases and makes writing portable code
       impossible.

Na busca por uma solução, até encontrei um patch pro twisted, que adiciona um método runQueryMapped e retorna uma lista de dicionários, como eu queria. Porém, aplicar patch no twisted é furada, pois o código só funcionaria nas máquinas cujo twisted tem o tal patch. Fora de cogitação.

A solução mais simples (e tosca) que encontrei foi a que funcionou melhor:

# coding: utf-8
# hack for twisted.enterprise.adbapi.ConnectionPool class, providing
# a new method mapQuery (just like runQuery) whose return value
# is a list of dictionaries that map column names to values.

from twisted.enterprise import adbapi

class hackPool(adbapi.ConnectionPool):
    def _mapQuery(self, trans, *args, **kw):
        trans.execute(*args, **kw)
        rs, new_rs = trans.fetchall(), []
        names = [d[0] for d in trans.description]
        for values in rs:
            row = dict()
            for k, v in zip(names, values):
                row[k] = v
            new_rs.append(row)
        return new_rs

    def mapQuery(self, *args, **kw):
        return self.runInteraction(self._mapQuery, *args, **kw)

Dessa maneira, o hack fica no próprio código e não requer nenhum patch. Dá-lhe gambi.

Anúncios

yeah, tudo assíncrono!

Ultimamente tem sido tudo assim, assíncro. O Nuswit já vai fazer aniversário de 1 ano, e vale lembrar que está em produção contínua, sem dar nenhuma manutenção.

Já estou mais que convencido que o caminho pros próximos anos dessa década não pode ser outro, ainda mais com o WebSocket no w3c.

Para contribuir com a interwebs, tenho mantido os seguintes projetos:

http://github.com/fiorix/cyclone
Um clone do Tornado, webserver assíncrono do FriendFeed, que desde o ano passado é do Facebook. Essa implementação, batizada de Cyclone, tem algumas diferenças:

  • Core I/O baseado no Twisted
  • Suporte nativo a XMLRPC
  • Suporte a localização baseada no gettext – ao invés do CSV, original do Tornado

Com vários aplicativos de exemplos, todos os plug-ins do Tornado para autenticação no Google, Twitter, Facebook, OAuth, OpenID, etc…

O RestMQ (coisas do Gleicon, que ajudei a implementar) é baseado nele. A nova versão do Nuswit também será.

http://github.com/fiorix/twisted-twitter-stream
Uma API bem simples para acessar a Streaming API do Twitter. Provê suporte a todos os métodos publicados pela API.

Não depende do TwistedWeb, a implementação do HTTP 1.1 está inteira no código – na verdade, apenas o lado do client com suporte a Comet.

Permite criar sistemas como este.

http://github.com/fiorix/txredisapi
Um driver assíncrono pro Redis, também baseado no Twisted. O protocolo de comunicação já existia, mas era carente de algumas coisas, que implementei:

Além de estável, é muito rápido! Também foi usado no RestMQ, e aparentemente, está se tornando popular. Hoje achei algumas referências enquanto procurava no Google.

http://github.com/fiorix/mongo-async-python-driver
Outro driver de banco de dados, pro MongoDB. O driver original para Python é síncrono, o que dificulta (embora não impossibilita) de usar em sistemas assíncronos, especialmente baseados no Twisted.

Boa parte da implementação é baseada no pymongo original, inclusive o codec de BSON (em C), formato binário usado pelo Mongo, baseado em JSON.

Provavelmente se tornará o driver assíncrono oficial do Mongo para Python+Twisted, e está em vias de se tornar estável – isso devido às várias mudanças na API, e implementação de vários recursos incluindo Lazy Connections, e Document Reference.

Também, já tem algumas pessoas de olho no GitHub, acompanhando o desenvolvimento.

Entre os vários dbs nosql (couch, redis, etc) o Mongo é um dos mais completos, com uma linguagem de query muito decente, entre os vários outros recursos nativos. O fato de usar mmap para acessar os dados também faz com que ele seja muito rápido.


mongodb e twisted

Há algum tempo venho fazendo testes com o MongoDB pra casos específicos onde um RDBM tradicional como MySQL ou PostgreSQL não se encaixa muito bem.

Um dos casos onde um banco de dados baseado em objetos como o MongoDB se encaixa perfeitamente, é em um dos meus sistemas comerciais de telefonia, o Nuswit.

Lá, o usuário pode criar uma campanha de tele mensagem e colocar variáveis, que serão usadas para ligar para as pessoas e falar algumas coisas dinâmicas, sintetizando o texto em voz. Hoje, cada vez que o usuário cria uma campanha, pode importar uma planilha ou arquivo CSV, e então o sistema uma cria nova tabela no Sqlite com os campos que o usuário definiu na campanha, de acordo com essas tais variáveis.

Por isso, não é possível ter uma tabela estática, muito menos fazer relacionamentos pra usar a tabela no estilo chave=valor, pois essa mesma tabela é usada pros relatórios que o usuário baixa após o término da campanha.

Nesse caso, o MongoDB se encaixa perfeitamente. É muito mais simples criar uma coleção de dados com o mesmo nome da campanha, e importar documentos tipo JSON (nome=x, telefone=y, cpf=z) do que criar uma nova tabela com esses campos.

Além do mais, a API do pymongo é muito mais decente do que qualquer coisa parecida com SQL, pois os databases e collections são objetos do Python.

O único problema com o pymongo é que ele foi feito pra controlar e manter um pool de conexões com o banco, totalmente síncrono. Pra usar o pymongo em servidores como os que tenho feito ultimamente, assíncronos, baseados em Twisted, é necessário mandar todas as chamadas do banco pra um thread (usando callInThread ou deferToThread).

Pra solucionar esse problema, passei a frequentar o #mongodb na freenode, e em contato com o autor do pymongo, acabei criando uma versão assíncrona do driver, baseado em Twisted, que mantém o mesmo estilo da API original.

Agora, a integração entre Twisted e MongoDB está muito mais decente, usando pymonga.


mais sobre crawlers e spiders

logo_mercadolivreNo mês passado escrevi um artigo com um programa para capturar todos os items da primeira página de cada categoria do MercadoLivre.

Lá, lidava com alguns problemas como:

  • limite de concorrência no download das páginas
  • processamento de html em thread, síncrono
  • manter a maior parte do processo assíncrono, para ganhar tempo e CPU

Depois disso, precisei fazer umas alterações no código e acabei modificando um pouco programa, usando outras técnicas como:

Cada item desta lista corresponde aos itens da lista mais acima, respectivamente.

O esquema de cooperação do twisted é muito melhor que o Controller que havia criado anteriormente. Porém, muito mais complicado para jovens aprendizes. Recomendo este link para mais detalhes.

Sobre o processamento do html, vale a pena verificar o lxml. Antes, havia usado BeautifulSoup, que é muito bom, mas perde violentamente em desempenho e suporte a broken-html.

Por fim, o truque de usar generators para executar alguns callbacks inline é incrível, e absurdamente prático em casos como esse, do programa abaixo.

O resultado é final é o mesmo, mas a melhoria em desempenho é absurda. Fiz alguns testes na minha máquina e obtive o seguinte:

  • esta versão consome, em média, 20% menos de CPU
  • como não há necessidade de gravar os arquivos no disco, não consome disco
  • o processo todo ficou 657% mais rápido, simplesmente
  • ainda, o código é muito menor +_+

Veja ai:

#!/usr/bin/env python
# coding: utf-8

from lxml import html
from twisted.web import client
from twisted.python import log
from twisted.internet import task, defer, reactor

class MercadoLivre:
    def __str__(self):
        return 'http://www.mercadolivre.com.br/jm/ml.allcategs.AllCategsServlet'

    def parse_categories(self, content):
        category = subcategory = ''
        doc = html.fromstring(content)
        for link in doc.iterlinks():
            el, attr, href, offset = link
            try: category = el.find_class('categ')[0].text_content()
            except: pass
            else: continue
            if category:
                try: subcategory = el.find_class('seglnk')[0].text_content()
                except: continue
                else: yield (href, category, subcategory)

    def parse_subcategory(self, content):
        doc = html.fromstring(content)
        for element in doc.find_class('col_titulo'):
            yield element[0].text_content()

class Engine:
    def finish(self, result):
        reactor.stop()

    @defer.inlineCallbacks
    def fetch_categories(self, link, parser):
        try:
            doc = yield client.getPage(link)
            defer.returnValue(parser(doc))
        except Exception, e:
            print e

    def fetch_subcategory(self, links, parser, limit):
        coop = task.Cooperator()
        work = (client.getPage(link[0]).addCallback(parser).addCallback(self.page_items, *link) for link in links)
        result = defer.DeferredList([coop.coiterate(work) for x in xrange(limit)])
        result.addCallback(self.finish)
        result.addErrback(log.err)

    def page_items(self, items, href, category, subcategory):
        print 'Categoria: %s / %s' % (category.encode('utf-8'), subcategory.encode('utf-8'))
        for item in items: print ' -> %s' % item.encode('utf-8')
        print ''

def main(limit, *parsers):
    e = Engine()
    for parser in parsers:
        links = e.fetch_categories(str(parser), parser.parse_categories)
        links.addCallback(e.fetch_subcategory, parser.parse_subcategory, limit)

if __name__ == '__main__':
    reactor.callWhenRunning(main, 150, MercadoLivre())
    reactor.run()

twisted crawler, alvo: mercadolivre

logo_mercadolivreJá pensou em fazer um programa que acessa o Mercado Livre, identifica o link de cada categoria, e extrai todos os produtos da primeira página de cada uma dessas categorias?

Pode até parecer complexo, mas não é. Esse programa existe, é simples, e está aqui, neste artigo, pronto pra você testar e modificar. :)

A algumas semanas tenho feito alguns crawlers pra esses sites que vendem produtos, como o Web Motors, Submarino, e até mesmo o eBay. A idéia é simplesmente extrair todos os produtos da primeira página de cada categoria, e montar uma base de dados com essas informações. O objetivo? Segredo de Estado.

Um dos problemas que encontrei ao fazer esse tipo de crawler, é controlar a concorrência de acesso. Se o programa acessa uma categoria por vez, demora uma eternidade pra baixar e processar cada página. Por outro lado, se ele acessa todas as categorias descontroladamente, o processo acaba criando muitos File Descriptors e isso causa diversos outros problemas pro sistema operacional – e não adianta falar em aumentar o limite usando ulimit -n, porque esses crawlers têm baixado milhões de links por dia.

A melhor maneira que encontrei pra solucionar esse problema, foi criando uma classe chamada “Controller”, que coloca as requisições em uma fila, baixa N páginas por vez, e ao invés de processá-las, simplesmente salva em um arquivo no disco.

Todos esses arquivos, com nomes únicos, são gerados usando um hash SHA1, e colocados em uma lista para serem processados offline, após o processo de download terminar.

O twisted tem um recurso muito interessante, que permite agrupar diversos deferreds em um único, e quando todos eles terminam, executa um callback. Nesse caso, quando todos os processos de baixar a página da subcategoria terminam, executa a função “offline”, que começa a processar cada uma, e extrair os produtos de lá.

Esta segunda etapa também poderia ser feita em paralelo, usando o twisted-parallels, mas decidi não colocar porque o código ficaria muito maior e você não teria tanta paciência pra entendê-lo. Usar um thread pool pra isso não valeria a pena, pois vale lembrar que o python usa o GIL, e I/O em thread só consome recurso e não aumenta em nada o desempenho.

A classe “MercadoLivre” tem 3 funções:

  • __str__: retorna o link com todas as categorias do Mercado Livre
  • parse_categories: um parser que retorna uma lista composta por tuples de (url, nome)
  • parse_subcategory: um parser pro conteúdo de cada categoria, que retorna uma lista com o título dos produtos anunciados lá

Todos esses parsers são baseados no BeautifulSoup 3.1. É necessário tê-lo instalado pra usar o programa.

Por fim, para usar este programa em outros sites, basta substituir a classe “MercadoLivre” pela sua própria, tipo “Submarino”.

Detalhe: essa porcaria de WordPress mostra a identação do código errada no artigo, mas se você clicar em “view plain”, poderá copiar e colar o código correto. Chame o suporte!

Aqui o código (muito belo, por sinal…):

#!/usr/bin/env python
# coding: utf-8

import os, re, sys
import shutil, hashlib
from Queue import Queue
from BeautifulSoup import BeautifulSoup

# twisted
from twisted.web import client
from twisted.internet import defer, threads, reactor

class MercadoLivre:
noscript = re.compile(r”(?is)]*>(.*?)“)

def __str__(self):
return ‘http://www.mercadolivre.com.br/jm/ml.allcategs.AllCategsServlet’

def parse_categories(self, content):
catlist = {}
cleanup = lambda s: s.replace(‘\n’, ”).strip()

# parse the document
soup = BeautifulSoup(content)

# find all categories and their items
current = ”
for item in soup.findAll(‘a’, {‘class’:[‘categ’,’seglnk’]}):
text = cleanup(item.contents[0])
attrs = dict(item.attrs)
if attrs.get(u’class’) == u’categ’:
current = text
catlist[current] = []
else:
catlist[current].append((attrs.get(u’href’, ”), text))

# return the list of categories and their items
return catlist

def parse_subcategory(self, content):
result = []
soup = BeautifulSoup(self.noscript.sub(”, content),
convertEntities=BeautifulSoup.HTML_ENTITIES)

for item in soup.findAll(‘div’, {‘class’:’col_titulo’}):
result.append(item.find(‘a’).contents[0].strip())

return result

class Controller:
def __init__(self, fetch):
self.count = 0
self.limit = fetch
self.queue = Queue()
self.tmpdir = ‘/tmp/mercadolivre.%d’ % os.getpid()
self.dispatch()

def encode(self, text):
return unicode(text).encode(‘utf-8’, ‘replace’)

def getPage(self, url, *args, **kwargs):
d = defer.Deferred()
self.queue.put((d, url, args, kwargs))
return d

def dispatch(self):
while True:
try:
assert self.count < self.limit d, url, args, kwargs = self.queue.get_nowait() except: break self.count += 1 deferred = client.getPage(url, *args, **kwargs) deferred.addBoth(self.decrease_count) deferred.chainDeferred(d) reactor.callLater(1, self.dispatch) def decrease_count(self, result): self.count -= 1 return result class main(Controller): def __init__(self, fetch, parser): # set the concurrent download limit for the crawler Controller.__init__(self, fetch) # fetch the main categories page self.total = 0 self.files = [] self.parser = parser d = self.getPage(str(parser), timeout=60) d.addCallback(self.fetch_categories) d.addErrback(self.error_categories) def error_categories(self, error): # hmmm... fatal error, cannot continue print 'cannot fetch categories from %s: %s' % (str(self.parser), str(error)) reactor.stop() def error_subcategory(self, error, href, category, subcategory): # problem fetching subcategory contents... print 'error "%s / %s": [%s] %s' % (category, subcategory, href, error.value) def fetch_categories(self, content): # parse the contents in a thread reactor.callInThread(self.parse_categories, content) def parse_categories(self, content): try: categories = self.parser.parse_categories(content) except Exception, e: print 'error parsing categories: %s' % str(e) reactor.stop() return print 'going to fetch %d categories...' % len(categories) tasks = [] for category, contents in categories.items(): category = self.encode(category) for href, subcategory in contents: href, subcategory = self.encode(href), self.encode(subcategory) d = self.getPage(href, timeout=60) d.addCallback(self.save_subcategory, href, category, subcategory) d.addErrback(self.error_subcategory, href, category, subcategory) tasks.append(d) # call the offline subcategory parser after downloading everything... d = defer.gatherResults(tasks) d.addCallback(self.offline) def save_subcategory(self, contents, href, category, subcategory): # create the tmpdir if it doesn't exist if not os.path.exists(self.tmpdir): os.mkdir(self.tmpdir) # create a unique hash for each category hash = hashlib.new('sha1', category+subcategory).hexdigest() filename = os.path.join(self.tmpdir, hash+'.dump') try: fd = open(filename, 'w') fd.write(contents) fd.close() except: return self.files.append((href, category, subcategory, filename)) print 'saving "%s / %s": %s' % (category, subcategory, hash) def offline(self, null): # start processing each subcategory... reactor.stop() for item in self.files: href, category, subcategory, filename = item sys.stdout.write('parsing "%s / %s": ' % (category, subcategory)) sys.stdout.flush() fd = open(filename) try: results = self.parser.parse_subcategory(fd.read()) except Exception, e: print 'error! %s' % str(e) continue lr = len(results) self.total += lr print '%d items' % lr for result in results: print ' ' + self.encode(result) print '\n%d items processed. cleaning up!' % self.total shutil.rmtree(self.tmpdir) if __name__ == '__main__': reactor.callWhenRunning(main, 10, MercadoLivre()) reactor.run() [/sourcecode] Agora, é só você se dedicar e fazer isso pros sites que te interessam. Notas?


paralelismo: python e twisted

parent-childExecutar algumas tarefas em paralelo no python, usando threads, pode ser problemático, especialmente quando há I/O envolvido.

Apesar do interpretador do python usar as threads nativas do sistema operacional (na maioria dos *nix, pthreads), ele não é 100% thread-safe. Isso não é um bug, mas sim um recurso usado para evitar travamento ou computação incorreta de valores.

Do texto original:

The Python interpreter is not fully thread safe. In order to support multi-threaded Python programs, there’s a global lock, called the global interpreter lock or GIL, that must be held by the current thread before it can safely access Python objects. Without the lock, even the simplest operations could cause problems in a multi-threaded program: for example, when two threads simultaneously increment the reference count of the same object, the reference count could end up being incremented only once instead of twice.

Therefore, the rule exists that only the thread that has acquired the global interpreter lock may operate on Python objects or call Python/C API functions. In order to support multi-threaded Python programs, the interpreter regularly releases and reacquires the lock — by default, every 100 bytecode instructions (this can be changed with sys.setcheckinterval()). The lock is also released and reacquired around potentially blocking I/O operations like reading or writing a file, so that other threads can run while the thread that requests the I/O is waiting for the I/O operation to complete.

Do outro lado, temos o Twisted. Para mim, o Twisted é uma super biblioteca de I/O, com todos os recursos necessários para tratar tarefas de modo assíncrono (non-blocking), considerando ainda aquelas que bloqueiam a execução por determinado tempo (blocking).

Internamente, o Twisted mantém um thread pool para executar as tarefas síncronas, como por exemplo executar um INSERT em banco de dados. Para tal, existem as funções threads.deferToThread e reactor.callInThread.

Porém, se considerarmos o GIL, mencionado acima, temos como resultado um belo problema: enquanto uma tarefa que necessita I/O síncrono está sendo executada em uma thread, o interpretador do python fica bloqueado naquela operação e todo o resto fica parado. Isso causa uma perda de desempenho sem tamanho, e diversos outros efeitos colaterais (como time-out em sockets, etc).

Solucionar o problema de paralelismo no python não é tão complicado, mas também não é tão simples. Existem dois módulos que conheço, que fazem isso: pyprocessing e multiprocessing.

Ambos usam uma API similar à do módulo threading, mas ao invés de threads, criam processos usando fork(), que por sua vez, executam outro interpretador do python e se livram dos efeitos do GIL em um único processo. Mas, quando usados em conjunto com o Twisted, necessitam diversas adaptações para a comunicação entre os processos pai e filhos, pois essa comunicação é feita através de pipes.

Com tudo isso junto, o GIL começa a se tornar um problema e a coisa toda já se parece com uma grande confusão, que para muitos já parece não ter solução decente. Em suma, é o seguinte: escrever programas que necessitam parelelismo em python exige usar pyprocessing ou multiprocessing, mas quando o programa é inteiro assíncrono usando Twisted, tudo fica complicado.

Isso obviamente aconteceu comigo em um sistema relativamente grande, que precisava executar a classificação de alguns dados em paralelo, e quando usava o thread pool do Twisted, o processo inteiro ficava lento devido à grande quantidade de I/O para ler e gravar arquivos no disco. Era o GIL me atrapalhando.

Para solucionar esse problema, escrevi um módulo que usa a função spawnProcess do próprio Twisted, e automaticamente trata do pipe entre os processos pai e filho. Ainda, escrevi um protocolo de comunicação entre eles que permite transmitir e receber dados entre os processos de modo transparente, de maneira assíncrona.

Ainda, esse módulo possui uma classe que cria um pool de processos (não threads) para onde é possível despachar dados para serem processados em paralelo, e aguardar pelo resultado em um deferred, seguindo todo o padrão do Twisted.

Depois de executar todos os testes necessários e deixar o código estável, implementei isso no meu sistema e resolvi todo o problema do paralelismo de maneira simples e elegante, sem nenhuma gambiarra nem esquisitisse.

Denominado Twisted-Parallels, o módulo foi liberado sob a GPL v2 e está disponível no Google Code, com alguns exemplos de utilização.


freeswitch, eventsocket, twisted!

freeswitchApós meses tentando resolver problemas ridículos com meu discador automático baseado em Asterisk, acabei mudando totalmente o rumo do sistema e re-escrevi todo o código para o FreeSWITCH.

A versão anterior se conectava no manager do Asterisk, originava as chamadas, e naquela completa bagunça sem padrão ia controlando o fluxo da chamada com auxílio de um AGI. Funções como transferência e gravação com detecção de caixa postal geravam diversas complicações devido à maneira como o Asterisk funciona, e o resultado era muita dor de cabeça.

Agora, a bagunça é assíncrona, tem padrão, e controla totalmente o FreeSWITCH através do Event Socket. Ao se conectar no Inbound Socket, o sistema origina as chamadas com destino ao Outbound Socket, onde cada uma é controlada individualmente e todos recursos funcionam perfeitamente, inclusive as várias maneiras de realizar transferência (bridge).

O FreeSWITCH está próximo da perfeição no quesito funcionamento, pecando apenas na documentação e alguns detalhes relacionados ao retorno (erro, etc) na originação de chamadas. Porém, esses detalhes são totalmente contornáveis com soluções simples, sem gambiarras e maracutaias como era com o Asterisk.

Em uma máquina comum, consigo manter 60 ligações simultâneas consumindo apenas 3% de cpu time. E o FreeSWITCH, que faz todo o controle das chamadas – conversão de mídia, gravação, detecção de caixa postal, de fax, etc etc etc, consome apenas 30% de cpu time.

Tudo isso é feito usando um protocolo do Twisted pro Event Socket do FreeSWITCH, que escrevi no último final de semana – entre carnaval, rock, etc. E pra felicidade geral da nação, tornei público pela lisença GPLv2, disponível no Google Code com exemplos.