2016-12-14 60 views
1

出于某种原因,以下堆栈跟踪在启动该过程大约16小时或更长时间后首先显示,然后在每隔一小时或两小时后显示。我在其他服务器上每天,每小时和每几分钟都有其他的工作,他们没有得到这个堆栈跟踪。我可以看到唯一的区别是这个代码每个调度程序有多个作业,而其他的则不是。FreeTDS和PyODBC随机连接从服务器断开

堆栈跟踪:

Traceback (most recent call last): 
    File "app.py", line 24, in _run_client 
    h.run_hourly(start, end) 
    File "/home/foo/anaconda2/lib/python2.7/site-packages/foo/client/bar/helper.py", line 153, in run_hourly 
    _run(_HOURLY_REQUEST, frequency, start, end, duration) 
    File "/home/foo/anaconda2/lib/python2.7/site-packages/foo/client/bar/helper.py", line 117, in _run 
    bars = dr.get_bars(frequency) 
    File "/home/foo/anaconda2/lib/python2.7/site-packages/foo/client/bar/data_requests.py", line 35, in get_bars 
    df = pd.read_sql(query, _ENGINE, params=params) 
    File "/home/foo/anaconda2/lib/python2.7/site-packages/pandas/io/sql.py", line 415, in read_sql 
chunksize=chunksize) 
    File "/home/foo/anaconda2/lib/python2.7/site-packages/pandas/io/sql.py", line 1084, in read_query 
    result = self.execute(*args) 
    File "/home/foo/anaconda2/lib/python2.7/site-packages/pandas/io/sql.py", line 975, in execute 
    return self.connectable.execute(*args, **kwargs) 
    File "/home/foo/anaconda2/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 2055, in execute 
    return connection.execute(statement, *multiparams, **params) 
    File "/home/foo/anaconda2/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 945, in execute 
    return meth(self, multiparams, params) 
    File "/home/foo/anaconda2/lib/python2.7/site-packages/sqlalchemy/sql/elements.py", line 263, in _execute_on_connection 
    return connection._execute_clauseelement(self, multiparams, params) 
    File "/home/foo/anaconda2/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1053, in _execute_clauseelement 
compiled_sql, distilled_params 
    File "/home/foo/anaconda2/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1189, in _execute_context 
context) 
    File "/home/foo/anaconda2/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1386, in _handle_dbapi_exception 
    self._autorollback() 
    File "/home/foo/anaconda2/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 824, in _autorollback 
    self._root._rollback_impl() 
    File "/home/foo/anaconda2/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 703, in _rollback_impl 
    self._handle_dbapi_exception(e, None, None, None, None) 
    File "/home/foo/anaconda2/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1315, in _handle_dbapi_exception 
    exc_info 
    File "/home/foo/anaconda2/lib/python2.7/site-packages/sqlalchemy/util/compat.py", line 202, in raise_from_cause 
    reraise(type(exception), exception, tb=exc_tb, cause=cause) 
    File "/home/foo/anaconda2/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 701, in _rollback_impl 
    self.engine.dialect.do_rollback(self.connection) 
    File "/home/foo/anaconda2/lib/python2.7/site-packages/sqlalchemy/engine/default.py", line 439, in do_rollback 
    dbapi_connection.rollback() 
DBAPIError: (pyodbc.Error) ('08S01', '[08S01] [FreeTDS][SQL Server]Write to the server failed (20006) (SQLEndTran)') 

的问候下面提到的版本的服务器匹配。

版本:
操作系统:Ubuntu的16.04.1
freetds的软件包版本:freetds的,常见的有:0.91-6.1build1,tdsodbc:AMD64:0.91-6.1build1 PyODBC:3.0.10
SQL炼金:1.1。 4

TSQL输出:

Compile-time settings (established with the "configure" script) 
         Version: freetds v0.91 
     freetds.conf directory: /etc/freetds 
MS db-lib source compatibility: no 
    Sybase binary compatibility: yes 
        Thread safety: yes 
        iconv library: yes 
        TDS version: 4.2 
          iODBC: no 
         unixodbc: yes 
      SSPI "trusted" logins: no 
         Kerberos: yes 

测试代码:

app.py

from datetime import datetime, timedelta 
from dateutil import relativedelta 
import traceback 

from apscheduler.schedulers.blocking import BlockingScheduler 
from bar import helper as h 


def _run_client(resolution): 
    try: 
     if resolution == "hourly": 
      # Had to create a temporary variable to make native datetimes 
      t = datetime.utcnow() - timedelta(hours=1) 
      end = datetime(t.year, t.month, t.day, t.hour) 
      start = end - timedelta(hours=1) 
      h.run_hourly(start, end) 
     elif resolution == "daily": 
      # Had to create a temporary variable to make native datetimes 
      t = datetime.utcnow().date() - timedelta(days=1) 
      end = datetime(t.year, t.month, t.day) 
      start = end - timedelta(days=1) 
      h.run_daily(start, end) 
     else: 
      # Had to create a temporary variable to make native datetimes 
      t = datetime.utcnow().date().replace(
       day=1) - relativedelta.relativedelta(months=1) 
      end = datetime(t.year, t.month, t.day) 
      start = end - relativedelta.relativedelta(months=1) 
      h.run_monthly(start, end) 
    except: 
     print "Current run failed:\n%s" % traceback.format_exc() 


def _get_hourly_job(sched): 
    args = ["hourly"] 
    job = sched.add_job(_run_client, args=args, trigger="cron", hour="*", minute="0") 
    return job 


def _get_daily_job(sched): 
    args = ["daily"] 
    job = sched.add_job(_run_client, args=args, trigger="cron", hour="4", minute="0") 
    return job 


def _get_monthly_job(sched): 
    args = ["monthly"] 
    job = sched.add_job(_run_client, args=args, trigger="cron", day="1", hour="0", minute="0") 
    return job 

if __name__ == "__main__": 
    sched = BlockingScheduler() 
    hourly_job = _get_hourly_job(sched) 
    daily_job = _get_daily_job(sched) 
    monthly_job = _get_monthly_job(sched) 

    try: 
     sched.start() 
    except: 
     # Remove the jobs from memory since they finished 
     hourly_job.remove() 
     daily_job.remove() 
     monthly_job.remove() 

     sched.shutdown() 

helper.py

from datetime import timedelta 

import data_requests as dr 

_HOURLY_REQUEST = Foo() 
_HOURLY_REQUEST.resolution = "hourly" 

_DAILY_REQUEST = Foo() 
_DAILY_REQUEST.resolution = "daily" 

_MONTHLY_REQUEST = Foo() 
_MONTHLY_REQUEST.resolution = "monthly" 


def _run(request, frequency, start, end, duration): 
    bars = dr.get_bars(frequency) 

    if bars.empty: 
     return None 
    print "Bars = %i" % len(bars) 


def run_daily(start, end): 
    frequency = 86400 
    duration = timedelta(hours=4) 
    _run(_DAILY_REQUEST, frequency, start, end, duration) 


def run_hourly(start, end): 
    frequency = 3600 
    duration = timedelta(minutes=30) 
    _run(_HOURLY_REQUEST, frequency, start, end, duration) 


def run_monthly(start, end): 
    frequency = 1209600 
    duration = timedelta(days=1) 
    _run(_MONTHLY_REQUEST, frequency, start, end, duration) 

data_requests.py

import pandas as pd 
from sqlalchemy import create_engine, exc 
from sqlalchemy.sql import text 

_DB = "mssql+pyodbc://[email protected]:[email protected]:1433/foo?driver=/usr/lib/x86_64-linux-gnu/odbc/libtdsodbc.so&tds_version=7.2" 
_ENGINE = create_engine(_DB) 


def get_bars(frequency): 
    query = text("""SELECT h.foo_id, s.id AS bar_id, u.timezone 
       FROM foo h 
       INNER JOIN bar s ON h.foo_id = s.foo_id 
       AND s.frequency <= :frequency 
       INNER JOIN test u ON u.id = h.test_id""") 
    params = { 
     "frequency": int(frequency) 
    } 

    try: 
     df = pd.read_sql(query, _ENGINE, params=params) 
    except exc.DBAPIError, e: 
     # If connection is invalid (e.g. database restarted) execute the query 
     # again 
     if e.connection_invalidated: 
      df = pd.read_sql(query, _ENGINE, params=params) 
     else: 
      raise e 

    return df 

我试过pymssql,但与最新版本出现segfaults(2.1.3,freetds的版本1.00.9)而pyodbc则不。我认为这是一个驱动程序问题,但我正试图解决它。另外,我遵循SQL Alchemy关于如何处理连接断开连接的文档,但是看着堆栈跟踪,它似乎没有达到逻辑。相反,它碰到else块并引发错误。

+0

该错误似乎来自堆栈比您的Python层更远。考虑到时间安排,SQL Server端是否有任何计划任务可以在特定时间创建锁定问题?错误中的SQL查询似乎不应该需要一段时间。当你运行'tsql -C'时,正在报告什么版本的TDS协议? – FlipperPA

+0

@FlipperPA不是我所知道的。运行tsql -C之后,它看起来可能是与pymssql一起安装的anaconda freetds与已安装的软件包管理器freetds之间的冲突。我会再运行一小时,看看是否是这个问题。 – ColinMc

+0

@FlipperPA我已经更新了一些更多的细节。看来这不是冲突问题。我查看了master数据库中的sys.event_log表,并且没有显示成功连接以外的任何信息。对我来说,它看起来像一个连接问题,但由于某些原因,我在SQL Alchemy文档中使用的代码不处理这种情况。 – ColinMc

回答

1

我找到了错误的原因。毕竟这似乎是一个编码问题。花了很长时间才弄清楚,但这与我如何用熊猫打电话有关。显然,熊猫在传递引擎实例时似乎并没有关闭连接。以下是我用来解决问题的代码。

def _get_df(query, params=None): 
    try: 
     with _ENGINE.begin() as conn: 
      df = pd.read_sql(query, conn, params=params) 
    except exc.DBAPIError, e: 
     # If connection is invalid (e.g. database restarted) execute the query 
     # again 
     if e.connection_invalidated: 
      with _ENGINE.begin() as conn: 
       df = pd.read_sql(query, conn, params=params) 
     else: 
      raise e 
    return df 
相关问题