Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proto query latest table #89

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/datalake_api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from datalake_api import settings


logging.basicConfig(level=logging.INFO)
ABPLMC marked this conversation as resolved.
Show resolved Hide resolved
LOGGER = logging.getLogger(__name__)


Expand Down
54 changes: 44 additions & 10 deletions api/datalake_api/querier.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@

from memoized_property import memoized_property
from datalake.common import DatalakeRecord
from boto3.dynamodb.conditions import Key
import base64
import json
import time
import os

import logging
log = logging.getLogger(__name__)


'''the maximum number of results to return to the user
Expand Down Expand Up @@ -172,9 +177,17 @@ def _unpack(self, result):

class ArchiveQuerier(object):

def __init__(self, table_name, dynamodb=None):
def __init__(self, table_name,
latest_table_name=None,
use_latest_table=None,
latest_max_lookback=30,
dynamodb=None):
self.table_name = table_name
self.latest_table_name = latest_table_name
self.use_latest_table = use_latest_table
self.latest_max_lookback = latest_max_lookback
self.dynamodb = dynamodb


def query_by_work_id(self, work_id, what, where=None, cursor=None):
kwargs = self._prepare_work_id_kwargs(work_id, what)
Expand Down Expand Up @@ -330,18 +343,27 @@ def _cursor_for_time_query(self, response, results, current_bucket):
@memoized_property
def _table(self):
return self.dynamodb.Table(self.table_name)

@memoized_property
def _latest_table(self):
return self.dynamodb.Table(self.latest_table_name)

def query_latest(self, what, where, lookback_days=DEFAULT_LOOKBACK_DAYS):
current = int(time.time() * 1000)
end = current - lookback_days * _ONE_DAY_MS
while current >= end:
bucket = current/DatalakeRecord.TIME_BUCKET_SIZE_IN_MS
r = self._get_latest_record_in_bucket(bucket, what, where)
if r is not None:
return r
current -= _ONE_DAY_MS
log.info('Inside query_latest method')
if self.use_latest_table:
log.info('inside use_latest_table=TRUE')
response = self._latest_table.query(
KeyConditionExpression=Key('what_where_key').eq(f'{what}:{where}')
)
items = response.get('Items', [])
if not items and self.latest_max_lookback > 0:
return self._default_latest(what, where, lookback_days)

latest_item = items[0]
return dict(url=latest_item['url'], metadata=latest_item['metadata'])

return None
else:
return self._default_latest(what, where, lookback_days)

def _get_latest_record_in_bucket(self, bucket, what, where):
kwargs = self._prepare_time_bucket_kwargs(bucket, what)
Expand All @@ -365,3 +387,15 @@ def _get_all_records_in_bucket(self, bucket, **kwargs):
break
kwargs['ExclusiveStartKey'] = response['LastEvaluatedKey']
return records

def _default_latest(self, what, where, lookback_days=DEFAULT_LOOKBACK_DAYS):
current = int(time.time() * 1000)
end = current - lookback_days * _ONE_DAY_MS
while current >= end:
bucket = current/DatalakeRecord.TIME_BUCKET_SIZE_IN_MS
r = self._get_latest_record_in_bucket(bucket, what, where)
if r is not None:
return r
current -= _ONE_DAY_MS

return None
7 changes: 7 additions & 0 deletions api/datalake_api/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,16 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import os

# default settings

DYNAMODB_TABLE = 'test'
DYNAMODB_LATEST_TABLE = 'test_latest'
DATALAKE_USE_LATEST_TABLE = \
os.environ.get("DATALAKE_USE_LATEST_TABLE", "false").lower() == "true"
LATEST_MAX_LOOKBACK = int(os.environ.get("LATEST_MAX_LOOKBACK", "30"))
ABPLMC marked this conversation as resolved.
Show resolved Hide resolved

AWS_REGION = 'us-west-2'
AWS_ACCESS_KEY_ID = None
AWS_SECRET_ACCESS_KEY = None
Expand Down
6 changes: 6 additions & 0 deletions api/datalake_api/v0.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,13 @@ def get_dynamodb():
def get_archive_querier():
if not hasattr(app, 'archive_querier'):
table_name = app.config.get('DYNAMODB_TABLE')
latest_table_name = app.config.get('DYNAMODB_LATEST_TABLE')
use_latest_table = app.config.get('DATALAKE_USE_LATEST_TABLE')
latest_max_lookback = app.config.get("LATEST_MAX_LOOKBACK")
app.archive_querier = ArchiveQuerier(table_name,
latest_table_name,
use_latest_table,
latest_max_lookback,
dynamodb=get_dynamodb())
return app.archive_querier

Expand Down
47 changes: 40 additions & 7 deletions api/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,20 @@ def tear_down():
}
]

latest_attribute_definitions = [
{
'AttributeName': 'what_where_key',
'AttributeType': 'S'
}
]

latest_key_schema = [
{
'AttributeName': 'what_where_key',
'KeyType': 'HASH'
}
]

global_secondary = [{
'IndexName': 'work-id-index',
'KeySchema': [
Expand Down Expand Up @@ -140,19 +154,24 @@ def _delete_table(table):
raise e


def _create_table(dynamodb, table_name):
def _create_table(dynamodb,
table_name,
attribute_definitions,
key_schema,
global_secondary=None):
table = dynamodb.Table(table_name)
_delete_table(table)
kwargs = dict(
TableName=table_name,
AttributeDefinitions=attribute_definitions,
KeySchema=key_schema,
GlobalSecondaryIndexes=global_secondary,
ProvisionedThroughput={
'ReadCapacityUnits': 5,
'WriteCapacityUnits': 5
}
)
if global_secondary:
kwargs['GlobalSecondaryIndexes'] = global_secondary
dynamodb.create_table(**kwargs)
return dynamodb.Table(table_name)

Expand All @@ -162,20 +181,27 @@ def _populate_table(table, records):
for r in records:
batch.put_item(Item=r)


# Adding latest table logic so latest table will be created and records will populate it
# Once that's possible, we will simply query the latest_table for what:where, no bucket logic
@pytest.fixture
def table_maker(request, dynamodb):

def maker(records):
table_name = 'test'
table = _create_table(dynamodb, table_name)
latest_table_name = 'test_latest'

table = _create_table(dynamodb, table_name, attribute_definitions, key_schema, global_secondary)
latest_table = _create_table(dynamodb, latest_table_name, latest_attribute_definitions, latest_key_schema)

_populate_table(latest_table, records)
_populate_table(table, records)

def tear_down():
_delete_table(table)
request.addfinalizer(tear_down)
_delete_table(latest_table)

return table
request.addfinalizer(tear_down)
return (table, latest_table)

return maker

Expand All @@ -189,6 +215,13 @@ def maker(**kwargs):
key = '/'.join([str(v) for v in kwargs.values()])
url = 's3://datalake-test/' + key
s3_file_from_metadata(url, m)
return DatalakeRecord.list_from_metadata(url, m)
records = DatalakeRecord.list_from_metadata(url, m)

what = kwargs.get('what')
where = kwargs.get('where')
for record in records:
record['what_where_key'] = f"{what}:{where}"

return records

return maker
65 changes: 59 additions & 6 deletions api/tests/test_archive_querier.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,28 @@ def query_latest(self, what, where):
return HttpRecord(**record)


@pytest.fixture(params=[ArchiveQuerier, HttpQuerier],
ids=['archive_querier', 'http'])
def querier(request, dynamodb):
return request.param('test', dynamodb=dynamodb)

"""
Incorporate LATEST_MAX_LOOKBACK HERE
"""
@pytest.fixture(params=[
('archive', 'use_latest'),
('archive', 'use_default'),
('http', 'use_latest'),
('http', 'use_default')
], ids=['archive_latest', 'archive-default', 'http-latest', 'http-default'])
def querier(monkeypatch, request, dynamodb):
querier_type, table_usage = request.param

if table_usage == 'use_latest':
monkeypatch.setenv('DATALAKE_USE_LATEST_TABLE', 'true')
else:
monkeypatch.setenv('DATALAKE_USE_LATEST_TABLE', 'false')

if querier_type == 'http':
return HttpQuerier('test', 'test_latest', dynamodb=dynamodb)
else:
return ArchiveQuerier('test', 'test_latest', dynamodb=dynamodb)

def in_url(result, part):
url = result['url']
Expand Down Expand Up @@ -407,6 +424,10 @@ def test_no_end(table_maker, querier, s3_file_from_metadata):
url = 's3://datalake-test/' + m['id']
s3_file_from_metadata(url, m)
records = DatalakeRecord.list_from_metadata(url, m)
for record in records:
what = record.get('what')
where = record.get('where')
record['what_where_key'] = f'{what}:{where}'
table_maker(records)
results = querier.query_by_time(m['start'], m['start'] + 1, m['what'])
assert len(results) == 1
Expand All @@ -419,7 +440,12 @@ def test_no_end_exclusion(table_maker, querier, s3_file_from_metadata):
url = 's3://datalake-test/' + m['id']
s3_file_from_metadata(url, m)
records = DatalakeRecord.list_from_metadata(url, m)
for record in records:
what = record.get('what')
where = record.get('where')
record['what_where_key'] = f'{what}:{where}'
table_maker(records)

results = querier.query_by_time(m['start'] + 1, m['start'] + 2, m['what'])
assert len(results) == 0

Expand Down Expand Up @@ -478,8 +504,7 @@ def test_latest_creation_time_breaks_tie(table_maker, querier,
start = bucket * DatalakeRecord.TIME_BUCKET_SIZE_IN_MS
interval = DatalakeRecord.TIME_BUCKET_SIZE_IN_MS/150
end = start + interval
table = table_maker([])

table = table_maker([])[0]
for i in range(3):
record = record_maker(start=start,
end=end,
Expand Down Expand Up @@ -528,3 +553,31 @@ def test_2x_max_results_in_one_bucket(table_maker, querier, record_maker):
pages = get_all_pages(querier.query_by_time, [start, end, 'boo'])
results = consolidate_pages(pages)
assert len(results) == MAX_RESULTS * 2


def test_latest_table_query(table_maker, querier, record_maker):
ABPLMC marked this conversation as resolved.
Show resolved Hide resolved
now = int(time.time() * 1000)
records = []
bucket = int(now/DatalakeRecord.TIME_BUCKET_SIZE_IN_MS)
start = bucket * DatalakeRecord.TIME_BUCKET_SIZE_IN_MS
end = start
for i in range(MAX_RESULTS):
records += record_maker(start=start,
end=end,
what='boo',
where='hoo{}'.format(i))
table_maker(records)
querier.use_latest_table = True
result = querier.query_latest('boo', 'hoo0')
_validate_latest_result(result, what='boo', where='hoo0')

"""
Write tests:
With setup of latest table records,
with DYNAMODB_LATEST_TABLE set, with DATALAKE_USE_LATEST_TABLE=true, with LATEST_MAX_LOOKBACK=0, record is found

With setup of latest table records,
with DYNAMODB_LATEST_TABLE set, with DATALAKE_USE_LATEST_TABLE=false, with LATEST_MAX_LOOKBACK=0, record is not found

2-4
"""
4 changes: 4 additions & 0 deletions api/tests/test_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ def maker(content, metadata):
s3_file_maker('datalake-test', path, content, metadata)
url = 's3://datalake-test/' + path
records = DatalakeRecord.list_from_metadata(url, metadata)
for record in records:
what = record.get('what')
where = record.get('where')
record['what_where_key'] = f"{what}:{where}"
table_maker(records)

return maker
Expand Down
2 changes: 1 addition & 1 deletion ingester/datalake_ingester/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def __init__(self, table_name=None, latest_table=None, connection=None):
self.table_name = table_name
self.latest_table_name = os.environ.get("DATALAKE_DNAMODB_LATEST_TABLE",
f"{latest_table}")
self.use_latest = os.environ.get("DATALAKE_USE_LATEST_TABLE", False)
self.use_latest_table = os.environ.get("DATALAKE_USE_LATEST_TABLE", False)
self._prepare_connection(connection)
self.logger = logging.getLogger('storage')

Expand Down
Loading