Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proto query latest table #89

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/datalake_api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from datalake_api import settings


logging.basicConfig(level=logging.INFO)
ABPLMC marked this conversation as resolved.
Show resolved Hide resolved
LOGGER = logging.getLogger(__name__)


Expand Down
54 changes: 44 additions & 10 deletions api/datalake_api/querier.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@

from memoized_property import memoized_property
from datalake.common import DatalakeRecord
from boto3.dynamodb.conditions import Key
import base64
import json
import time
import os

import logging
log = logging.getLogger(__name__)


'''the maximum number of results to return to the user
Expand Down Expand Up @@ -172,9 +177,15 @@ def _unpack(self, result):

class ArchiveQuerier(object):

def __init__(self, table_name, dynamodb=None):
def __init__(self, table_name,
latest_table_name=None,
use_latest_table=None,
dynamodb=None):
self.table_name = table_name
self.latest_table_name = latest_table_name
self.use_latest_table = use_latest_table
self.dynamodb = dynamodb


def query_by_work_id(self, work_id, what, where=None, cursor=None):
kwargs = self._prepare_work_id_kwargs(work_id, what)
Expand Down Expand Up @@ -330,18 +341,28 @@ def _cursor_for_time_query(self, response, results, current_bucket):
@memoized_property
def _table(self):
return self.dynamodb.Table(self.table_name)

@memoized_property
def _latest_table(self):
return self.dynamodb.Table(self.latest_table_name)

def query_latest(self, what, where, lookback_days=DEFAULT_LOOKBACK_DAYS):
current = int(time.time() * 1000)
end = current - lookback_days * _ONE_DAY_MS
while current >= end:
bucket = current/DatalakeRecord.TIME_BUCKET_SIZE_IN_MS
r = self._get_latest_record_in_bucket(bucket, what, where)
if r is not None:
return r
current -= _ONE_DAY_MS
if self.use_latest_table:
log.info('inside use_latest_table=TRUE')
response = self._latest_table.query(
KeyConditionExpression=Key('what_where_key').eq(f'{what}:{where}')
)
items = response.get('Items', [])

if not items:
log.info('Falling back to default latest query')
return self._default_latest(what, where, lookback_days)

return None
latest_item = items[0]
return dict(url=latest_item['url'], metadata=latest_item['metadata'])

else:
return self._default_latest(what, where, lookback_days)

def _get_latest_record_in_bucket(self, bucket, what, where):
kwargs = self._prepare_time_bucket_kwargs(bucket, what)
Expand All @@ -365,3 +386,16 @@ def _get_all_records_in_bucket(self, bucket, **kwargs):
break
kwargs['ExclusiveStartKey'] = response['LastEvaluatedKey']
return records

def _default_latest(self, what, where, lookback_days=DEFAULT_LOOKBACK_DAYS):
log.info("Using default latest behavior")
current = int(time.time() * 1000)
end = current - lookback_days * _ONE_DAY_MS
while current >= end:
bucket = current/DatalakeRecord.TIME_BUCKET_SIZE_IN_MS
r = self._get_latest_record_in_bucket(bucket, what, where)
if r is not None:
return r
current -= _ONE_DAY_MS

return None
5 changes: 5 additions & 0 deletions api/datalake_api/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,14 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import os

# default settings

DYNAMODB_TABLE = 'test'
DYNAMODB_LATEST_TABLE = 'test_latest'
DATALAKE_USE_LATEST_TABLE = False

AWS_REGION = 'us-west-2'
AWS_ACCESS_KEY_ID = None
AWS_SECRET_ACCESS_KEY = None
Expand Down
19 changes: 16 additions & 3 deletions api/datalake_api/v0.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

v0 = flask.Blueprint('v0', __name__, url_prefix='/v0')

_archive_querier = None

def _get_aws_kwargs():
kwargs = dict(
Expand All @@ -48,11 +49,23 @@ def get_dynamodb():


def get_archive_querier():
if not hasattr(app, 'archive_querier'):
global _archive_querier

if not _archive_querier:
table_name = app.config.get('DYNAMODB_TABLE')
app.archive_querier = ArchiveQuerier(table_name,
latest_table_name = app.config.get('DYNAMODB_LATEST_TABLE')
use_latest_table = app.config.get('DATALAKE_USE_LATEST_TABLE')
_archive_querier = ArchiveQuerier(table_name,
latest_table_name,
use_latest_table,
dynamodb=get_dynamodb())
return app.archive_querier
return _archive_querier


def reset_archive_querier():
"""FOR TESTING PURPOSES ONLY"""
global _archive_querier
_archive_querier = None


@v0.route('/archive/')
Expand Down
47 changes: 41 additions & 6 deletions api/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@


def get_client():
from datalake_api import settings
datalake_api.app.config.from_object(settings)

datalake_api.app.config['TESTING'] = True
datalake_api.app.config['AWS_ACCESS_KEY_ID'] = 'abc'
datalake_api.app.config['AWS_SECRET_ACCESS_KEY'] = '123'
Expand Down Expand Up @@ -107,6 +110,20 @@ def tear_down():
}
]

latest_attribute_definitions = [
{
'AttributeName': 'what_where_key',
'AttributeType': 'S'
}
]

latest_key_schema = [
{
'AttributeName': 'what_where_key',
'KeyType': 'HASH'
}
]

global_secondary = [{
'IndexName': 'work-id-index',
'KeySchema': [
Expand Down Expand Up @@ -140,19 +157,24 @@ def _delete_table(table):
raise e


def _create_table(dynamodb, table_name):
def _create_table(dynamodb,
table_name,
attribute_definitions,
key_schema,
global_secondary=None):
table = dynamodb.Table(table_name)
_delete_table(table)
kwargs = dict(
TableName=table_name,
AttributeDefinitions=attribute_definitions,
KeySchema=key_schema,
GlobalSecondaryIndexes=global_secondary,
ProvisionedThroughput={
'ReadCapacityUnits': 5,
'WriteCapacityUnits': 5
}
)
if global_secondary:
kwargs['GlobalSecondaryIndexes'] = global_secondary
dynamodb.create_table(**kwargs)
return dynamodb.Table(table_name)

Expand All @@ -168,14 +190,20 @@ def table_maker(request, dynamodb):

def maker(records):
table_name = 'test'
table = _create_table(dynamodb, table_name)
latest_table_name = 'test_latest'

table = _create_table(dynamodb, table_name, attribute_definitions, key_schema, global_secondary)
latest_table = _create_table(dynamodb, latest_table_name, latest_attribute_definitions, latest_key_schema)

_populate_table(latest_table, records)
_populate_table(table, records)

def tear_down():
_delete_table(table)
request.addfinalizer(tear_down)
_delete_table(latest_table)

return table
request.addfinalizer(tear_down)
return (table, latest_table)

return maker

Expand All @@ -189,6 +217,13 @@ def maker(**kwargs):
key = '/'.join([str(v) for v in kwargs.values()])
url = 's3://datalake-test/' + key
s3_file_from_metadata(url, m)
return DatalakeRecord.list_from_metadata(url, m)
records = DatalakeRecord.list_from_metadata(url, m)

what = kwargs.get('what')
where = kwargs.get('where')
for record in records:
record['what_where_key'] = f"{what}:{where}"

return records

return maker
85 changes: 78 additions & 7 deletions api/tests/test_archive_querier.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.

import os
import pytest
from datalake_api.v0 import reset_archive_querier
from datalake_api import settings
# from flask import current_app as app
from datalake.common import DatalakeRecord
from datalake.tests import generate_random_metadata
import simplejson as json
Expand Down Expand Up @@ -123,11 +126,36 @@ def query_latest(self, what, where):
return HttpRecord(**record)


@pytest.fixture(params=[ArchiveQuerier, HttpQuerier],
ids=['archive_querier', 'http'])
def querier(request, dynamodb):
return request.param('test', dynamodb=dynamodb)

@pytest.fixture(params=[
('archive', 'use_latest'),
('archive', 'use_default'),
('http', 'use_latest'),
('http', 'use_default')
], ids=['archive-latest',
'archive-default',
'http-latest',
'http-default'
])
def querier(monkeypatch, request, dynamodb):

reset_archive_querier()
querier_type, table_usage = request.param

if table_usage == 'use_latest':
settings.DATALAKE_USE_LATEST_TABLE = True
else:
settings.DATALAKE_USE_LATEST_TABLE= False

if querier_type == 'http':
return HttpQuerier('test',
'test_latest',
dynamodb=dynamodb)
else:
return ArchiveQuerier('test',
'test_latest',
use_latest_table=True if table_usage == 'use_latest' else False,
dynamodb=dynamodb)

def in_url(result, part):
url = result['url']
Expand Down Expand Up @@ -407,6 +435,10 @@ def test_no_end(table_maker, querier, s3_file_from_metadata):
url = 's3://datalake-test/' + m['id']
s3_file_from_metadata(url, m)
records = DatalakeRecord.list_from_metadata(url, m)
for record in records:
what = record.get('what')
where = record.get('where')
record['what_where_key'] = f'{what}:{where}'
table_maker(records)
results = querier.query_by_time(m['start'], m['start'] + 1, m['what'])
assert len(results) == 1
Expand All @@ -419,7 +451,12 @@ def test_no_end_exclusion(table_maker, querier, s3_file_from_metadata):
url = 's3://datalake-test/' + m['id']
s3_file_from_metadata(url, m)
records = DatalakeRecord.list_from_metadata(url, m)
for record in records:
what = record.get('what')
where = record.get('where')
record['what_where_key'] = f'{what}:{where}'
table_maker(records)

results = querier.query_by_time(m['start'] + 1, m['start'] + 2, m['what'])
assert len(results) == 0

Expand Down Expand Up @@ -478,8 +515,7 @@ def test_latest_creation_time_breaks_tie(table_maker, querier,
start = bucket * DatalakeRecord.TIME_BUCKET_SIZE_IN_MS
interval = DatalakeRecord.TIME_BUCKET_SIZE_IN_MS/150
end = start + interval
table = table_maker([])

table = table_maker([])[0]
for i in range(3):
record = record_maker(start=start,
end=end,
Expand Down Expand Up @@ -528,3 +564,38 @@ def test_2x_max_results_in_one_bucket(table_maker, querier, record_maker):
pages = get_all_pages(querier.query_by_time, [start, end, 'boo'])
results = consolidate_pages(pages)
assert len(results) == MAX_RESULTS * 2


def test_latest_table_query(table_maker, querier, record_maker):
ABPLMC marked this conversation as resolved.
Show resolved Hide resolved
now = int(time.time() * 1000)
records = []
bucket = int(now/DatalakeRecord.TIME_BUCKET_SIZE_IN_MS)
start = bucket * DatalakeRecord.TIME_BUCKET_SIZE_IN_MS
end = start
for i in range(MAX_RESULTS):
records += record_maker(start=start,
end=end,
what='boo',
where='hoo{}'.format(i))
table_maker(records)
result = querier.query_latest('boo', 'hoo0')
_validate_latest_result(result, what='boo', where='hoo0')


def test_query_latest_just_latest_table(table_maker, querier, record_maker):
use_latest_from_env = settings.DATALAKE_USE_LATEST_TABLE
table = table_maker([])[1]
for i in range(3):
record = record_maker(what='meow',
where=f'tree',
path='/{}'.format(i))

# only inserting into latest table
table.put_item(Item=record[0])
time.sleep(1.01)

result = querier.query_latest('meow', 'tree')
if use_latest_from_env:
_validate_latest_result(result, what='meow', where='tree')
else:
assert result is None
4 changes: 4 additions & 0 deletions api/tests/test_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ def maker(content, metadata):
s3_file_maker('datalake-test', path, content, metadata)
url = 's3://datalake-test/' + path
records = DatalakeRecord.list_from_metadata(url, metadata)
for record in records:
what = record.get('what')
where = record.get('where')
record['what_where_key'] = f"{what}:{where}"
table_maker(records)

return maker
Expand Down
Loading