2009-10-01 119 views
0

我正在学习Python,并且通过在线资源和此站点上的人员的帮助,我正在掌握它。在我的第一个脚本中,我正在解析Twitter RSS源条目并将结果插入到数据库中,还有一个我无法修复的问题。即,重复的条目正被插入到其中一个表中。使用sqlite,sqlalchemy,python在数据库中重复插入

作为一个背景,我最初在HalOtis.com上发现了一个用于下载RSS源的基本脚本,然后通过几种方式对其进行了修改:1)修改了Twitter RSS源中的特性(未将其分离为内容,标题,网址等); 2)添加了“hashtags”和多对多关系(entry_tag表)的表格; 3)将表格设置更改为sqlalchemy; 4)进行了一些临时更改,以解决正在发生的奇怪unicode问题。因此,代码在各个地方都很难看,但它已经成为一种很好的学习体验,现在可以工作 - 除了它会在“条目”表中插入重复内容。

因为我不确定什么对人最有帮助,所以我粘贴了下面的所有代码,并在一些地方发表了一些评论,指出我认为最重要的内容。

我真的很感谢任何帮助。谢谢!

编辑:有人建议我为数据库提供模式。我从来没有这样做过,所以如果我做得不对,请忍受我。我设置了四个表:

  1. RSSFeeds,其中包含Twitter的RSS Feed列表
  2. RSSEntries,其中包含从每个饲料的下载(分析后)个别条目的列表(与内容列,井号标签,日期,URL)
  3. 标签,它包含了所有在每个条目(鸣叫)
  4. entry_tag,其中包含让我的标签映射到项列中发现的井号标签的列表。

总之,以下脚本从RSS Feeds表中抓取了五个测试RSS源,从每个源下载20个最新的条目/推文,解析条目,并将信息放入RSS条目,标签,和entry_tag表。

#!/usr/local/bin/python 

import sqlite3 
import threading 
import time 
import Queue 
from time import strftime 
import re  
from string import split 
import feedparser 
from django.utils.encoding import smart_str, smart_unicode  
from sqlalchemy import schema, types, ForeignKey, select, orm 
from sqlalchemy import create_engine 

engine = create_engine('sqlite:///test98.sqlite', echo=True) 
metadata = schema.MetaData(engine) 
metadata.bind = engine 

def now(): 
    return datetime.datetime.now() 


#set up four tables, with many-to-many relationship 
RSSFeeds = schema.Table('feeds', metadata, 
    schema.Column('id', types.Integer, 
     schema.Sequence('feeds_seq_id', optional=True), primary_key=True), 
    schema.Column('url', types.VARCHAR(1000), default=u''), 
) 


RSSEntries = schema.Table('entries', metadata, 
    schema.Column('id', types.Integer, 
     schema.Sequence('entries_seq_id', optional=True), primary_key=True), 
    schema.Column('feed_id', types.Integer, schema.ForeignKey('feeds.id')), 
    schema.Column('short_url', types.VARCHAR(1000), default=u''), 
    schema.Column('content', types.Text(), nullable=False), 
    schema.Column('hashtags', types.Unicode(255)), 
    schema.Column('date', types.String()), 
) 


tag_table = schema.Table('tag', metadata, 
    schema.Column('id', types.Integer, 
     schema.Sequence('tag_seq_id', optional=True), primary_key=True), 
    schema.Column('tagname', types.Unicode(20), nullable=False, unique=True), 
) 


entrytag_table = schema.Table('entrytag', metadata, 
    schema.Column('id', types.Integer, 
     schema.Sequence('entrytag_seq_id', optional=True), primary_key=True), 
    schema.Column('entryid', types.Integer, schema.ForeignKey('entries.id')), 
    schema.Column('tagid', types.Integer, schema.ForeignKey('tag.id')), 
) 


metadata.create_all(bind=engine, checkfirst=True) 


# Insert test set of Twitter RSS feeds 
stmt = RSSFeeds.insert() 
stmt.execute(
    {'url': 'http://twitter.com/statuses/user_timeline/14908909.rss'}, 
    {'url': 'http://twitter.com/statuses/user_timeline/52903246.rss'}, 
    {'url': 'http://twitter.com/statuses/user_timeline/41902319.rss'}, 
    {'url': 'http://twitter.com/statuses/user_timeline/29950404.rss'}, 
    {'url': 'http://twitter.com/statuses/user_timeline/35699859.rss'}, 
) 



#These 3 lines for threading process (see HalOtis.com for example) 
THREAD_LIMIT = 20 
jobs = Queue.Queue(0) 
rss_to_process = Queue.Queue(THREAD_LIMIT) 


#connect to sqlite database and grab the 5 test RSS feeds 
conn = engine.connect() 
feeds = conn.execute('SELECT id, url FROM feeds').fetchall() 

#This block contains all the parsing and DB insertion 
def store_feed_items(id, items): 
    """ Takes a feed_id and a list of items and stores them in the DB """ 
    for entry in items: 
     conn.execute('SELECT id from entries WHERE short_url=?', (entry.link,)) 
     #note: entry.summary contains entire feed entry for Twitter, 
        #i.e., not separated into content, etc. 
     s = unicode(entry.summary) 
     test = s.split() 
     tinyurl2 = [i for i in test if i.startswith('http://')] 
     hashtags2 = [i for i in s.split() if i.startswith('#')] 
     content2 = ' '.join(i for i in s.split() if i not in tinyurl2+hashtags2) 
     content = unicode(content2) 
     tinyurl = unicode(tinyurl2) 
     hashtags = unicode (hashtags2) 
     print hashtags 
     date = strftime("%Y-%m-%d %H:%M:%S",entry.updated_parsed) 


     #Insert parsed feed data into entries table 
        #THIS IS WHERE DUPLICATES OCCUR 
     result = conn.execute(RSSEntries.insert(), {'feed_id': id, 'short_url': tinyurl, 
      'content': content, 'hashtags': hashtags, 'date': date}) 
     entry_id = result.last_inserted_ids()[0] 


     #Look up tag identifiers and create any that don't exist: 
     tags = tag_table 
     tag_id_query = select([tags.c.tagname, tags.c.id], tags.c.tagname.in_(hashtags2)) 
     tag_ids = dict(conn.execute(tag_id_query).fetchall()) 
     for tag in hashtags2: 
      if tag not in tag_ids: 
       result = conn.execute(tags.insert(), {'tagname': tag}) 
       tag_ids[tag] = result.last_inserted_ids()[0] 

     #insert data into entrytag table 
     if hashtags2: conn.execute(entrytag_table.insert(), 
      [{'entryid': entry_id, 'tagid': tag_ids[tag]} for tag in hashtags2]) 


#Rest of file completes the threading process  
def thread(): 
    while True: 
     try: 
      id, feed_url = jobs.get(False) # False = Don't wait 
     except Queue.Empty: 
      return 

     entries = feedparser.parse(feed_url).entries 
     rss_to_process.put((id, entries), True) # This will block if full 

for info in feeds: # Queue them up 
    jobs.put([info['id'], info['url']]) 

for n in xrange(THREAD_LIMIT): 
    t = threading.Thread(target=thread) 
    t.start() 

while threading.activeCount() > 1 or not rss_to_process.empty(): 
    # That condition means we want to do this loop if there are threads 
    # running OR there's stuff to process 
    try: 
     id, entries = rss_to_process.get(False, 1) # Wait for up to a second 
    except Queue.Empty: 
     continue 

    store_feed_items(id, entries) 
+0

如果你提供了一个模式,这将有所帮助,所以我们不必从源代码中推断出它。 – Fragsworth 2009-10-01 19:54:09

+0

谢谢 - 我会在上面添加一些内容。 – 2009-10-01 20:39:31

回答

2

看起来你SQLAlchemy的包括成不使用SQLAlchemy以前存在的脚本。这里有太多的移动部件,我们没有一个明显理解得足够好。

我会建议从头开始。不要使用线程。不要使用sqlalchemy。要开始也许不要使用SQL数据库。使用简单的循环和可能的time.sleep(),将一个脚本收集到您想要的信息中,并以简单的方式将其收集到一个简单的数据结构中。然后,当它工作时,您可以将存储添加到SQL数据库中,并且我真的不认为直接编写SQL语句比使用ORM困难得多,并且更容易调试恕我直言。很有可能你永远不需要添加线程。

“如果您认为自己足够聪明,可以编写多线程程序,那么您就不是。” - James Ahlstrom。

相关问题