博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
How it works(5) TileStache源码阅读(A) 核心框架
阅读量:4303 次
发布时间:2019-05-27

本文共 21493 字,大约阅读时间需要 71 分钟。

引入

作为一个维护时间更长久的地图服务器(从2010年至今),Python编写的TileStache有着跟Nodejs编写的Tiletrata不同的设计理念:

  • 全面专业.包含较多常用或不常用的专业地理信息功能.
  • 配置文件.与Tilestrata的代码即配置不同,TileStache完全基于一个独立的配置文件来调配整个系统.

功能

相比Tilestrata的只编写框架,功能完全靠插件加载,Tilestache内置了相当多的功能.了解其具体的功能对于理解架构是必不可少的.

功能的分类其实与Tilestrata相似:主要分为数据提供者,缓存,和图片处理.
数据提供者:

  • Mapnik
  • Proxy
  • Vector
  • mbtiles
  • Sandwitch

附加提供者:

  • PostGeoJSON
  • Mapnik Grid
  • UtfGrid
  • TileDataOSM
  • SolrGeoJSON
  • MirrorOSM
  • Grid
  • GDAL

缓存:

  • disk
  • memchache
  • s3
  • redis

附加缓存:

  • Monkeycache
  • GoogleCloud

对各种后期功能的添加是不可预料的,因此TileStache在设计时也把握了类似Tilestrata的插件式挂载的理念,便于编写更多的插件,减轻主框架维护的压力.

结构

如果将所有具体功能模块剔除,那么就剩下TileStache的核心框架文件:

  • __init__.py

以及挂载不同功能的模块:

__init__.py

整个框架的入口,也是TileStache实现处理网络请求的模块.默认实现了兼容WSGI协议.

启动TileStache的非常简单:

from werkzeug.serving import run_simpleimport TileStacheapp = TileStache.WSGITileServer(config=options.file, autoreload=True)run_simple(options.ip, options.port, app)

那就来看一下这个WSGITileServer是如何实现的:

class WSGITileServer:    def __init__(self, config, autoreload=False):        if is_string_type(config):            self.autoreload = autoreload            self.config_path = config            # 解析配置文件            try:                self.config = parseConfig(config)            except:                print("Error loading Tilestache config:")                raise        else:            # 配置文件为对象时检查其格式            assert hasattr(                config, 'cache'), 'Configuration object must have a cache.'            assert hasattr(                config, 'layers'), 'Configuration object must have layers.'            assert hasattr(                config, 'dirpath'), 'Configuration object must have a dirpath.'            self.autoreload = False            self.config_path = None            self.config = config    def __call__(self, environ, start_response):        # 每次请求时刷新配置        if self.autoreload:            try:                self.config = parseConfig(self.config_path)            except Exception as e:                raise Core.KnownUnknown(                    "Error loading Tilestache config file:\n%s" % str(e))        # 尝试从请求中解析路径信息        try:            layer, coord, ext = splitPathInfo(environ['PATH_INFO'])        except Core.KnownUnknown as e:            return self._response(start_response, 400, str(e))        if layer and layer not in self.config.layers:            return self._response(start_response, 404)        path_info = environ.get('PATH_INFO', None)        query_string = environ.get('QUERY_STRING', None)        script_name = environ.get('SCRIPT_NAME', None)        # 获取请求的内容        status_code, headers, content = requestHandler2(            self.config, path_info, query_string, script_name)        # 响应请求        return self._response(start_response, status_code, bytes(content), headers)    def _response(self, start_response, code, content='', headers=None):        headers = headers or Headers([])        if content:            headers.setdefault('Content-Length', str(len(content)))        start_response('%d %s' %                       (code, httplib.responses[code]), headers.items())        return [content]

其实它就做了两件事:

  • 初始化
    • 读取配置:parseConfig(config)
  • 处理请求:
    • 拆解请求参数:splitPathInfo(environ[‘PATH_INFO’])
    • 返回相应的内容:requestHandler2(self.config, path_info, query_string, script_name)

初始化

配置的读取是比较复杂的功能,有专门的模块,在此直接调用:

from . import Configdef parseConfig(configHandle):    if isinstance(configHandle, dict):        config_dict = configHandle        dirpath = '.'    else:       # 兼容本地配置文件或者在线配置文件        scheme, host, path, p, q, f = urlparse(configHandle)        if scheme == '':            scheme = 'file'            path = realpath(path)        if scheme == 'file':            with open(path) as file:                config_dict = json_load(file)        else:            config_dict = json_load(urlopen(configHandle))        dirpath = '%s://%s%s' % (scheme, host, dirname(path).rstrip('/') + '/')    return Config.buildConfiguration(config_dict, dirpath)

处理请求

拆解请求参数很简单:

_pathinfo_pat = re.compile(    r'^/?(?P
\w.+)/(?P
\d+)/(?P
-?\d+)/(?P
-?\d+)\.(?P
\w+)$') _preview_pat = re.compile(r'^/?(?P
\w.+)/(preview\.html)?$')def splitPathInfo(pathinfo): if pathinfo == '/': return None, None, None if _pathinfo_pat.match(pathinfo or ''): path = _pathinfo_pat.match(pathinfo) # 格式为:图层名/y/x/z/扩展名 layer, row, column, zoom, extension = [path.group(p) for p in 'lyxze'] # 解析成经纬度 coord = Coordinate(int(row), int(column), int(zoom)) elif _preview_pat.match(pathinfo or ''): # 返回图层的预览页面 path = _preview_pat.match(pathinfo) layer, extension = path.group('l'), 'html' coord = None else: raise Core.KnownUnknown( 'Bad path: "{}". I was expecting something more like "/example/0/0/0.png"'.format(pathinfo)) return layer, coord, extension

返回响应内容则是比较复杂的部分:

from . import Coredef requestHandler2(config_hint, path_info, query_string=None, script_name=''):    headers = Headers([])    try:        # 确保path_info起码有一个 "/"        path_info = '/' + (path_info or '').lstrip('/')        # 获取指定的图层        layer = requestLayer(config_hint, path_info)        query = parse_qs(query_string or '')        try:            callback = query['callback'][0]        except KeyError:            callback = None        coord, extension = splitPathInfo(path_info)[1:]        # 针对特定请求返回预览页面        if extension == 'html' and coord is None:            status_code, headers, content = getPreview(layer)        # 重定向处理        elif extension.lower() in layer.redirects:            other_extension = layer.redirects[extension.lower()]            redirect_uri = script_name            redirect_uri += mergePathInfo(layer.name(), coord, other_extension)            if query_string:                redirect_uri += '?' + query_string            headers['Location'] = redirect_uri            headers['Content-Type'] = 'text/plain'            return 302, headers, 'You are being redirected to %s\n' % redirect_uri        else:           # 获取瓦片            status_code, headers, content = layer.getTileResponse(                coord, extension)        if layer.allowed_origin:            headers.setdefault('Access-Control-Allow-Origin',                               layer.allowed_origin)        if callback and 'json' in headers['Content-Type']:            headers['Content-Type'] = 'application/javascript; charset=utf-8'            content = '%s(%s)' % (callback, content)        if layer.max_cache_age is not None:            expires = datetime.utcnow() + timedelta(seconds=layer.max_cache_age)            headers.setdefault('Expires', expires.strftime(                '%a, %d %b %Y %H:%M:%S GMT'))            headers.setdefault(                'Cache-Control', 'public, max-age=%d' % layer.max_cache_age)    except Core.KnownUnknown as e:        out = StringIO()        print('Known unknown!', file=out)        print(e, file=out)        print('', file=out)        print('\n'.join(Core._rummy()), file=out)        headers['Content-Type'] = 'text/plain'        status_code, content = 500, out.getvalue().encode('ascii')    return status_code, headers, content    def requestLayer(config, path_info):    if is_string_type(config):        key = hasattr(config, '__hash__') and (config, getcwd())        # 从缓存中获取配置,避免重复解析        if key in _previous_configs:            config = _previous_configs[key]        # 不存在就缓存配置        else:            config = parseConfig(config)            if key:                _previous_configs[key] = config    else:        assert hasattr(            config, 'cache'), 'Configuration object must have a cache.'        assert hasattr(            config, 'layers'), 'Configuration object must have layers.'        assert hasattr(            config, 'dirpath'), 'Configuration object must have a dirpath.'    path_info = '/' + (path_info or '').lstrip('/')    if path_info == '/':        return Core.Layer(config, None, None)    # 又重新获取了一遍图层名,不知道为什么    layername = splitPathInfo(path_info)[0]    if layername not in config.layers:        raise Core.KnownUnknown('"{}" is not a layer I know about. Here are some that I do know about: {}.'.format(            layername, ', '.join(sorted(config.layers.keys()))))    return config.layers[layername]# 获取图层预览def getPreview(layer):    return 200, Headers([('Content-Type', 'text/html')]), Core._preview(layer)

其中有我认为比较难以理解的地方:重复调用函数.

比如解析路径的函数,在拆解请求参数获取图层名,扩展名和坐标时调用了一次,在返回内容时,又为了获取坐标和扩展名调用了一次,在获取图层对象时,为了获取图层名又调用一次.其实可以通过变量在函数之间传递这些参数的.

整个TileStache的入口基本上就是这些功能.下面进入两大核心.

从__init__.py加载配置的buildConfiguration方法入手:

def buildConfiguration(config_dict, dirpath='.'):    scheme, h, path, p, q, f = urlparse(dirpath)    # 配置文件路径加入环境变量    if scheme in ('', 'file'):        sys.path.insert(0, path)    # 初始化缓存,缓存只能有一个,但可以是复合缓存    cache_dict = config_dict.get('cache', {
}) cache = _parseConfigCache(cache_dict, dirpath) # 新建一个配置文件对象 config = Configuration(cache, dirpath) # 初始化所有图层 for (name, layer_dict) in config_dict.get('layers', {
}).items(): config.layers[name] = _parseConfigLayer(layer_dict, config, dirpath) # 初始化日志等级 if 'logging' in config_dict: level = config_dict['logging'].upper() if hasattr(logging, level): logging.basicConfig(level=getattr(logging, level)) return config

其实也是干了两个事:

  • 初始化缓存
  • 初始化图层

配置文件中,缓存和图层的配置以如下方式存在

{
"cache": {
"name": "Test", "path": "/tmp/stache", "umask": "0000" }, "layers": {
"osm": {
"provider": {
"name": "proxy", "provider": "OPENSTREETMAP"}, "png options": {
"palette": "http://tilestache.org/example-palette-openstreetmap-mapnik.act"} }, "example": {
"provider": {
"name": "mapnik", "mapfile": "examples/style.xml"}, "projection": "spherical mercator" } }}

初始化缓存

def _parseConfigCache(cache_dict, dirpath):    if 'name' in cache_dict:       # 获取缓存的名称        _class = Caches.getCacheByName(cache_dict['name'])        kwargs = {
} # 获取对应配置值 def add_kwargs(*keys): for key in keys: if key in cache_dict: kwargs[key] = cache_dict[key] # 文件缓存 if _class is Caches.Disk: kwargs['path'] = enforcedLocalPath(cache_dict['path'], dirpath, 'Disk cache path') if 'umask' in cache_dict: kwargs['umask'] = int(cache_dict['umask'], 8) add_kwargs('dirs', 'gzip') # 复合缓存 elif _class is Caches.Multi: kwargs['tiers'] = [_parseConfigCache(tier_dict, dirpath) for tier_dict in cache_dict['tiers']] ...... elif 'class' in cache_dict: # 应对扩展的非内置缓存 _class = Core.loadClassPath(cache_dict['class']) kwargs = cache_dict.get('kwargs', {
}) kwargs = dict( [(str(k), v) for (k, v) in kwargs.items()] ) # 初始化这个缓存 cache = _class(**kwargs) return cache

初始化图层

def _parseConfigLayer(layer_dict, config, dirpath):   #获取坐标系    projection = layer_dict.get('projection', 'spherical mercator')    projection = Geography.getProjectionByName(projection)    # 添加一系列缓存相关参数    layer_kwargs = {
} if 'cache lifespan' in layer_dict: layer_kwargs['cache_lifespan'] = int(layer_dict['cache lifespan']) if 'stale lock timeout' in layer_dict: layer_kwargs['stale_lock_timeout'] = int(layer_dict['stale lock timeout']) ......... if 'preview' in layer_dict: preview_dict = layer_dict['preview'] for (key, func) in zip(('lat', 'lon', 'zoom', 'ext'), (float, float, int, str)): if key in preview_dict: layer_kwargs['preview_' + key] = func(preview_dict[key]) # 获取图层的四至 if 'bounds' in layer_dict: if type(layer_dict['bounds']) is dict: layer_kwargs['bounds'] = _parseLayerBounds(layer_dict['bounds'], projection) elif type(layer_dict['bounds']) is list: bounds = [_parseLayerBounds(b, projection) for b in layer_dict['bounds']] layer_kwargs['bounds'] = BoundsList(bounds) else: raise Core.KnownUnknown('Layer bounds must be a dictionary, not: ' + dumps(layer_dict['bounds'])) # 元瓦片 # 关于元瓦片,可以参照https://www.geowebcache.org/docs/current/concepts/metatiles.html meta_dict = layer_dict.get('metatile', {
}) metatile_kwargs = {
} for k in ('buffer', 'rows', 'columns'): if k in meta_dict: metatile_kwargs[k] = int(meta_dict[k]) metatile = Core.Metatile(**metatile_kwargs) # 处理jpg/png格式参数 jpeg_kwargs = {
} png_kwargs = {
} if 'jpeg options' in layer_dict: jpeg_kwargs = dict([(str(k), v) for (k, v) in layer_dict['jpeg options'].items()]) if 'png options' in layer_dict: png_kwargs = dict([(str(k), v) for (k, v) in layer_dict['png options'].items()]) # 图片处理 pixel_effect = None if 'pixel effect' in layer_dict: pixel_effect_dict = layer_dict['pixel effect'] pixel_effect_name = pixel_effect_dict.get('name') if pixel_effect_name in PixelEffects.all: pixel_effect_kwargs = {
} for k, v in pixel_effect_dict.items(): if k != 'name': pixel_effect_kwargs[str(k)] = float(v) PixelEffectClass = PixelEffects.all[pixel_effect_name] pixel_effect = PixelEffectClass(**pixel_effect_kwargs) # 绑定提供者 provider_dict = layer_dict['provider'] # 加载内置和额外的提供者 if 'name' in provider_dict: _class = Providers.getProviderByName(provider_dict['name']) provider_kwargs = _class.prepareKeywordArgs(provider_dict) elif 'class' in provider_dict: _class = Core.loadClassPath(provider_dict['class']) provider_kwargs = provider_dict.get('kwargs', {
}) provider_kwargs = dict( [(str(k), v) for (k, v) in provider_kwargs.items()] ) else: raise Exception('Missing required provider name or class: %s' % json_dumps(provider_dict)) # 实例化图层 layer = Core.Layer(config, projection, metatile, **layer_kwargs) layer.provider = _class(layer, **provider_kwargs) layer.setSaveOptionsJPEG(**jpeg_kwargs) layer.setSaveOptionsPNG(**png_kwargs) layer.pixel_effect = pixel_effect return layer

可以看出TileStache是通过读取配置的方式加载不同的插件的.这是很传统的方式,也有很大的麻烦,那就是如果扩展一种,就要在if判断里多写一种情况.

回顾一下Tilestrata是怎么做的:

strata.layer('basemap')    .route('tile@2x.png')        .use(disk.cache({dir: '/var/lib/tiles/basemap'}))        .use(mapnik({            pathname: '/path/to/map.xml',            tileSize: 512,            scale: 2        }))

是的,代码即配置,简洁多了,实现起来也完全没有加载配置项这一步了.

,因为基本上是为图层服务的.

我们回到一开始处理请求的__init__.py,那里获取请求调用的是如下方法:

status_code, headers, content = layer.getTileResponse(coord, extension)

获取瓦片可以说是图层存在的目的了,因此也是整个Layer类的最重头戏:

def getTileResponse(self, coord, extension, ignore_cached=False):        # 记录响应的时间点        start_time = time()        # 获取文件类型从而确定mime类型        mimetype, format = self.getTypeByExtension(extension)        # 定义默认的响应状态        status_code = 200        headers = Headers([('Content-Type', mimetype)])        body = None        cache = self.config.cache        if not ignore_cached:            #尝试从缓存中获取瓦片            try:                body = cache.read(self, coord, format)            except TheTileLeftANote as e:                headers = e.headers                status_code = e.status_code                body = e.content                if e.emit_content_type:                    headers.setdefault('Content-Type', mimetype)            tile_from = 'cache'        else:            # 从最近的文件获取            # 这个其实如果有redis或者memcache缓存的话就有些多此一举了            body = _getRecentTile(self, coord, format)            tile_from = 'recent tiles'        # 缓存里没有就深究        if body is None:            try:                lockCoord = None                if self.write_cache:                    # 对该位置的缓存上锁,保证后期写入缓存时不冲突                    lockCoord = self.metatile.firstCoord(coord)                    cache.lock(self, lockCoord, format)                if not ignore_cached:                    # 上锁后再次确认能否获取到缓存                    # 因为有可能第一次获取时其他进程正在写入,导致获取失败                    body = cache.read(self, coord, format)                    tile_from = 'cache after all'                if body is None:                    # No one else wrote the tile, do it here.                    buff = BytesIO()                    # 从缓存里找不到,就直接渲染                    try:                        tile = self.render(coord, format)                        save = True                    except NoTileLeftBehind as e:                       # 出现NoTileLeftBehind时,返回正常瓦片,但不进入缓存                       # 这种情况面向空白瓦片,这样的瓦片不需要缓存,可以减小缓存体积                        tile = e.tile                        save = False                        status_code = 404                    if not self.write_cache:                        save = False                    if format.lower() == 'jpeg':                        save_kwargs = self.jpeg_options                    elif format.lower() == 'png':                        save_kwargs = self.png_options                    else:                        save_kwargs = {
} # 将生成的瓦片存入buff中 tile.save(buff, format, **save_kwargs) body = buff.getvalue() # 将瓦片存入缓存 if save: cache.save(body, self, coord, format) tile_from = 'layer.render()' except TheTileLeftANote as e: # 处理带附言的瓦片 headers = e.headers status_code = e.status_code body = e.content if e.emit_content_type: headers.setdefault('Content-Type', mimetype) finally: if lockCoord: # 解锁对应缓存 cache.unlock(self, lockCoord, format) # 将瓦片存入自带的内存缓存 _addRecentTile(self, coord, format, body) return status_code, headers, body

在其中我们可以看到它实现了一个简单的实效缓存:

_recent_tiles = dict(hash={
}, list=[])# 存入缓存def _addRecentTile(layer, coord, format, body, age=300): key = (layer, coord, format) # 应过期时间 due = time() + age # 存入缓存字典 _recent_tiles['hash'][key] = body, due _recent_tiles['list'].append((key, due)) #寻找第一个超时的key cutoff = 0 for i, (key, due_by) in enumerate(_recent_tiles['list']): # 找到第一个未超时就停止 if time() < due_by: cutoff = i break # 删掉超时的key try: del _recent_tiles['hash'][key] except KeyError: pass # 将全部超时瓦片缓存删除 del _recent_tiles['list'][:cutoff]# 取缓存 def _getRecentTile(layer, coord, format): key = (layer, coord, format) body, use_by = _recent_tiles['hash'].get(key, (None, 0)) if body is None: return None # 是否够新 if time() < use_by: return body # 过旧就删除 try: del _recent_tiles['hash'][key] except KeyError: pass return None

说实话我觉得这样是没必要的,每次存入都要遍历一遍缓存列表删除过期,每次取都要检查当前时间并对比是否过期,都是对资源的消耗,同时,因为是按时间过期来删除,如果短时间内产生大量缓存,会对内存产生影响.

获取瓦片有一个重要的部分就是瓦片的渲染:

def render(self, coord, format):        # 所取区域是否超过范围,超过就返回空白图        if self.bounds and self.bounds.excludes(coord):            raise NoTileLeftBehind(Image.new('RGBA', (self.dim, self.dim), (0, 0, 0, 0)))        srs = self.projection.srs        # 当前所取瓦片的四至        xmin, ymin, xmax, ymax = self.envelope(coord)        width, height = self.dim, self.dim        provider = self.provider        metatile = self.metatile        pass_through = provider.pass_through if hasattr(provider, 'pass_through') else False        # 元瓦片模式        if self.doMetatile():            # 调整瓦片四至,变为该瓦片周围8个瓦片所包含范围的四至            xmin, ymin, xmax, ymax = self.metaEnvelope(coord)            width, height = self.metaSize(coord)            subtiles = self.metaSubtiles(coord)                # 在某些情况下,同时渲染特定瓦片和其周围的瓦片与只渲染特定瓦片的效果不一样        if self.doMetatile() or hasattr(provider, 'renderArea'):            # 周围瓦片区域渲染            tile = provider.renderArea(width, height, srs, xmin, ymin, xmax, ymax, coord.zoom)        elif hasattr(provider, 'renderTile'):            # 只对特定瓦片渲染            width, height = self.dim, self.dim            tile = provider.renderTile(width, height, srs, coord)        if self.bitmap_palette:            # 调色            if format.lower() == 'png':                t_index = self.png_options.get('transparency', None)                tile = apply_palette(tile, self.bitmap_palette, t_index)        if self.pixel_effect:            # 图像处理            if format.lower() in ('png', 'jpeg', 'tiff', 'bmp', 'gif'):                tile = self.pixel_effect.apply(tile)        if self.doMetatile():            tile, surtile = None, tile            # 将元瓦片切割成瓦片            for (other, x, y) in subtiles:                buff = BytesIO()                bbox = (x, y, x + self.dim, y + self.dim)                subtile = surtile.crop(bbox)                if self.palette256:                    subtile = apply_palette256(subtile)                subtile.save(buff, format)                body = buff.getvalue()                # 所有9个瓦片存入缓存                if self.write_cache:                    self.config.cache.save(body, self, other, format)                if other == coord:                    #只将重心瓦片返回                    tile = subtile                _addRecentTile(self, other, format, body)        return tile

Metatile 元瓦片

瓦片渲染时,反复提到了一个metatile(元瓦片)模式,这是一种加速响应的策略.

元瓦片的思路来源于这样一个假设:

对任何瓦片的获取都潜在的要获取它周围的8张瓦片,因为地图一定是连续的.

这与一般的通用缓存不同,它是针对地图这种特殊业务而进行优化的.

一般流程是:

  1. 服务器收到瓦片请求
  2. 服务器真实的去渲染了该瓦片及其周围8张瓦片范围的地图
  3. 将这张图按井字切为9个小瓦片
  4. 缓存这9张瓦片
  5. 只返回正中心的那张瓦片
  6. 等到该瓦片任意一个方向的瓦片因为地图加载顺序而被随后请求时,直接从缓存取得.

从时间耗费来说:

渲染1张瓦片的时间<渲染1张元瓦片的时间+切割成9份的时间<渲染9张瓦片的时间
只要用户拖动地图,缓存就会命中,就有加速的效果.

转载地址:http://dgqws.baihongyu.com/

你可能感兴趣的文章
大数据学习之HDP SANDBOX开始学习
查看>>
Hive Beeline使用
查看>>
Centos6安装图形界面(hdp不需要,hdp直接从github上下载数据即可)
查看>>
CentOS7 中把yum源更换成163源
查看>>
关于yum Error: Cannot retrieve repository metadata (repomd.xml) for repository:xxxxxx.
查看>>
linux下载github中的文件
查看>>
HDP Sandbox里面git clone不了数据(HTTP request failed)【目前还没解决,所以hive的练习先暂时搁置了】
查看>>
动态分区最佳实践(一定要注意实践场景)
查看>>
HIVE—索引、分区和分桶的区别
查看>>
Hive进阶总结(听课总结)
查看>>
大数据领域两大最主流集群管理工具Ambari和Cloudera Manger
查看>>
Sqoop往Hive导入数据实战
查看>>
Mysql到HBase的迁移
查看>>
Sqoop import进阶
查看>>
Hive语句是如何转化成MapReduce任务的
查看>>
Hive创建table报错:Permission denied: user=lenovo, access=WRITE, inode="":suh:supergroup:rwxr-xr-x
查看>>
Hive执行job时return code 2排查
查看>>
hive常用函数及数据结构介绍
查看>>
Hive面试题干货(亲自跟着做了好几遍,会了的话对面试大有好处)
查看>>
力扣题解-230. 二叉搜索树中第K小的元素(递归方法,中序遍历解决)
查看>>