summaryrefslogtreecommitdiffstats
path: root/meta/recipes-core/meta/cve-update-db-native.bb
blob: 575254af40cc03f8843b3c58011cfbfc564fd7f7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
SUMMARY = "Updates the NVD CVE database"
LICENSE = "MIT"

INHIBIT_DEFAULT_DEPS = "1"

inherit native

deltask do_unpack
deltask do_patch
deltask do_configure
deltask do_compile
deltask do_install
deltask do_populate_sysroot

python () {
    if not d.getVar("CVE_CHECK_DB_FILE"):
        raise bb.parse.SkipRecipe("Skip recipe when cve-check class is not loaded.")
}

python do_populate_cve_db() {
    """
    Update NVD database with json data feed
    """
    import bb.utils
    import sqlite3, urllib, urllib.parse, shutil, gzip
    from datetime import date

    bb.utils.export_proxies(d)

    BASE_URL = "https://nvd.nist.gov/feeds/json/cve/1.1/nvdcve-1.1-"
    YEAR_START = 2002

    db_file = d.getVar("CVE_CHECK_DB_FILE")
    db_dir = os.path.dirname(db_file)
    json_tmpfile = os.path.join(db_dir, 'nvd.json.gz')

    # Don't refresh the database more than once an hour
    try:
        import time
        if time.time() - os.path.getmtime(db_file) < (60*60):
            return
    except OSError:
        pass

    cve_f = open(os.path.join(d.getVar("TMPDIR"), 'cve_check'), 'a')

    if not os.path.isdir(db_dir):
        os.mkdir(db_dir)

    # Connect to database
    conn = sqlite3.connect(db_file)
    c = conn.cursor()

    initialize_db(c)

    for year in range(YEAR_START, date.today().year + 1):
        year_url = BASE_URL + str(year)
        meta_url = year_url + ".meta"
        json_url = year_url + ".json.gz"

        # Retrieve meta last modified date
        response = urllib.request.urlopen(meta_url)
        if response:
            for l in response.read().decode("utf-8").splitlines():
                key, value = l.split(":", 1)
                if key == "lastModifiedDate":
                    last_modified = value
                    break
            else:
                bb.warn("Cannot parse CVE metadata, update failed")
                return

        # Compare with current db last modified date
        c.execute("select DATE from META where YEAR = ?", (year,))
        meta = c.fetchone()
        if not meta or meta[0] != last_modified:
            # Clear products table entries corresponding to current year
            c.execute("delete from PRODUCTS where ID like ?", ('CVE-%d%%' % year,))

            # Update db with current year json file
            try:
                response = urllib.request.urlopen(json_url)
                if response:
                    update_db(c, gzip.decompress(response.read()).decode('utf-8'))
                c.execute("insert or replace into META values (?, ?)", [year, last_modified])
            except urllib.error.URLError as e:
                cve_f.write('Warning: CVE db update error, CVE data is outdated.\n\n')
                bb.warn("Cannot parse CVE data (%s), update failed" % e.reason)
                return

        # Update success, set the date to cve_check file.
        if year == date.today().year:
            cve_f.write('CVE database update : %s\n\n' % date.today())

    cve_f.close()
    conn.commit()
    conn.close()
}

def initialize_db(c):
    c.execute("CREATE TABLE IF NOT EXISTS META (YEAR INTEGER UNIQUE, DATE TEXT)")

    c.execute("CREATE TABLE IF NOT EXISTS NVD (ID TEXT UNIQUE, SUMMARY TEXT, \
        SCOREV2 TEXT, SCOREV3 TEXT, MODIFIED INTEGER, VECTOR TEXT)")

    c.execute("CREATE TABLE IF NOT EXISTS PRODUCTS (ID TEXT, \
        VENDOR TEXT, PRODUCT TEXT, VERSION_START TEXT, OPERATOR_START TEXT, \
        VERSION_END TEXT, OPERATOR_END TEXT)")
    c.execute("CREATE INDEX IF NOT EXISTS PRODUCT_ID_IDX on PRODUCTS(ID);")

def parse_node_and_insert(c, node, cveId):
    # Parse children node if needed
    for child in node.get('children', ()):
        parse_node_and_insert(c, child, cveId)

    def cpe_generator():
        for cpe in node.get('cpe_match', ()):
            if not cpe['vulnerable']:
                return
            cpe23 = cpe['cpe23Uri'].split(':')
            vendor = cpe23[3]
            product = cpe23[4]
            version = cpe23[5]

            if version != '*':
                # Version is defined, this is a '=' match
                yield [cveId, vendor, product, version, '=', '', '']
            else:
                # Parse start version, end version and operators
                op_start = ''
                op_end = ''
                v_start = ''
                v_end = ''

                if 'versionStartIncluding' in cpe:
                    op_start = '>='
                    v_start = cpe['versionStartIncluding']

                if 'versionStartExcluding' in cpe:
                    op_start = '>'
                    v_start = cpe['versionStartExcluding']

                if 'versionEndIncluding' in cpe:
                    op_end = '<='
                    v_end = cpe['versionEndIncluding']

                if 'versionEndExcluding' in cpe:
                    op_end = '<'
                    v_end = cpe['versionEndExcluding']

                yield [cveId, vendor, product, v_start, op_start, v_end, op_end]

    c.executemany("insert into PRODUCTS values (?, ?, ?, ?, ?, ?, ?)", cpe_generator())

def update_db(c, jsondata):
    import json
    root = json.loads(jsondata)

    for elt in root['CVE_Items']:
        if not elt['impact']:
            continue

        cveId = elt['cve']['CVE_data_meta']['ID']
        cveDesc = elt['cve']['description']['description_data'][0]['value']
        date = elt['lastModifiedDate']
        accessVector = elt['impact']['baseMetricV2']['cvssV2']['accessVector']
        cvssv2 = elt['impact']['baseMetricV2']['cvssV2']['baseScore']

        try:
            cvssv3 = elt['impact']['baseMetricV3']['cvssV3']['baseScore']
        except:
            cvssv3 = 0.0

        c.execute("insert or replace into NVD values (?, ?, ?, ?, ?, ?)",
                [cveId, cveDesc, cvssv2, cvssv3, date, accessVector])

        configurations = elt['configurations']['nodes']
        for config in configurations:
            parse_node_and_insert(c, config, cveId)


addtask do_populate_cve_db before do_fetch
do_populate_cve_db[nostamp] = "1"

EXCLUDE_FROM_WORLD = "1"